You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/12 01:29:51 UTC
kafka git commit: KAFKA-5713; Shutdown brokers in tests
Repository: kafka
Updated Branches:
refs/heads/trunk b6effcbba -> 12aa70d55
KAFKA-5713; Shutdown brokers in tests
Add broker shutdown for `LeaderEpochIntegrationTest`.
Move broker shutdown in other tests to `tearDown` to
ensure brokers are shutdown even if tests fail.
Also added assertion to `ZooKeeperTestHarness` to
verify that controller event thread is not running
since this thread may load JAAS configuration if ZK
ports are reused.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3026 from rajinisivaram/KAFKA-5173
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12aa70d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12aa70d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12aa70d5
Branch: refs/heads/trunk
Commit: 12aa70d55bc422226255ab18e69e4bc6f24be2d9
Parents: b6effcb
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Fri May 12 00:43:35 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 12 02:29:16 2017 +0100
----------------------------------------------------------------------
.../kafka/api/ProducerCompressionTest.scala | 5 +-
...tenersWithSameSecurityProtocolBaseTest.scala | 7 +-
.../ReplicaFetcherThreadFatalErrorTest.scala | 2 +-
.../other/kafka/ReplicationQuotasTestRig.scala | 5 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 5 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 209 +++++++++----------
.../unit/kafka/admin/DeleteTopicTest.scala | 43 ++--
.../admin/ReassignPartitionsClusterTest.scala | 5 +-
.../controller/ControllerIntegrationTest.scala | 5 +-
.../integration/KafkaServerTestHarness.scala | 3 +-
.../kafka/integration/TopicMetadataTest.scala | 20 +-
.../unit/kafka/producer/ProducerTest.scala | 5 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 7 +-
.../unit/kafka/server/LeaderElectionTest.scala | 5 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 3 +-
.../unit/kafka/server/LogRecoveryTest.scala | 6 +-
.../unit/kafka/server/OffsetCommitTest.scala | 3 +-
.../unit/kafka/server/ReplicaFetchTest.scala | 2 +-
.../kafka/server/ReplicationQuotasTest.scala | 9 +-
.../server/ServerGenerateBrokerIdTest.scala | 78 +++----
.../server/ServerGenerateClusterIdTest.scala | 44 ++--
.../unit/kafka/server/ServerStartupTest.scala | 39 ++--
...rivenReplicationProtocolAcceptanceTest.scala | 2 +-
.../epoch/LeaderEpochIntegrationTest.scala | 15 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 17 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++
26 files changed, 265 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b165918..2001095 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -30,7 +30,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
@RunWith(value = classOf[Parameterized])
@@ -53,8 +53,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
@After
override def tearDown() {
- server.shutdown
- CoreUtils.delete(server.config.logDirs)
+ TestUtils.shutdownServers(Seq(server))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 88b314f..7db9d7c 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -26,7 +26,7 @@ import kafka.api.SaslSetup
import kafka.common.Topic
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -133,10 +133,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
override def tearDown() {
producers.values.foreach(_.close())
consumers.values.foreach(_.close())
- servers.foreach { s =>
- s.shutdown()
- CoreUtils.delete(s.config.logDirs)
- }
+ TestUtils.shutdownServers(servers)
super.tearDown()
closeSasl()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index ae76eb6..9a04e67 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -45,7 +45,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
Exit.resetExitProcedure()
- brokers.foreach(_.shutdown())
+ TestUtils.shutdownServers(brokers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 38d07ba..d8bc65e 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
import kafka.utils.TestUtils._
import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Exit, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.producer.ProducerRecord
import org.jfree.chart.plot.PlotOrientation
@@ -108,8 +108,7 @@ object ReplicationQuotasTestRig {
}
override def tearDown() {
- servers.par.foreach(_.shutdown())
- servers.par.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index d95d90d..e9c5ac5 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.cluster.Broker
import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer}
@@ -59,8 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index ec54608..f8c65eb 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali
import org.apache.kafka.common.metrics.Quota
import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
import java.util.Properties
import kafka.utils._
@@ -47,6 +47,14 @@ import scala.util.Try
class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+ var servers: Seq[KafkaServer] = Seq()
+
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
@Test
def testReplicaAssignment() {
val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
@@ -188,7 +196,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
- val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
@@ -211,7 +219,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
- servers.foreach(_.shutdown())
}
@Test
@@ -219,7 +226,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
- val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
@@ -241,8 +248,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
-
- servers.foreach(_.shutdown())
}
@Test
@@ -250,7 +255,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create brokers
- val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
@@ -272,14 +277,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
- servers.foreach(_.shutdown())
}
@Test
def testReassigningNonExistingPartition() {
val topic = "test"
// create brokers
- val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@@ -288,7 +292,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
- servers.foreach(_.shutdown())
}
@Test
@@ -305,7 +308,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
reassignPartitionsCommand.reassignPartitions()
// create brokers
- val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// wait until reassignment completes
TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils),
@@ -317,7 +320,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
- servers.foreach(_.shutdown())
}
@Test
@@ -344,7 +346,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
- val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+ servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get
// trigger preferred replica election
@@ -352,7 +354,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
preferredReplicaElection.moveLeaderToPreferredReplica()
val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get
assertEquals("Preferred replica election failed", preferredReplica, newLeader)
- servers.foreach(_.shutdown())
}
@Test
@@ -362,7 +363,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val partition = 1
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
- val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+ servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
// create the topic
TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@@ -373,36 +374,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
controller.shutdownBroker(2, controlledShutdownCallback)
var partitionsRemaining = resultQueue.take().get
var activeServers = servers.filter(s => s.config.brokerId != 2)
- try {
- // wait for the update metadata request to trickle to the brokers
- TestUtils.waitUntilTrue(() =>
- activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
- "Topic test not created after timeout")
- assertEquals(0, partitionsRemaining.size)
- var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
- var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
- assertEquals(0, leaderAfterShutdown)
- assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
- assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
-
- controller.shutdownBroker(1, controlledShutdownCallback)
- partitionsRemaining = resultQueue.take().get
- assertEquals(0, partitionsRemaining.size)
- activeServers = servers.filter(s => s.config.brokerId == 0)
- partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
- leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
- assertEquals(0, leaderAfterShutdown)
-
- assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
- controller.shutdownBroker(0, controlledShutdownCallback)
- partitionsRemaining = resultQueue.take().get
- assertEquals(1, partitionsRemaining.size)
- // leader doesn't change since all the replicas are shut down
- assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
- }
- finally {
- servers.foreach(_.shutdown())
- }
+ // wait for the update metadata request to trickle to the brokers
+ TestUtils.waitUntilTrue(() =>
+ activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+ "Topic test not created after timeout")
+ assertEquals(0, partitionsRemaining.size)
+ var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+ var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ assertEquals(0, leaderAfterShutdown)
+ assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+ assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
+
+ controller.shutdownBroker(1, controlledShutdownCallback)
+ partitionsRemaining = resultQueue.take().get
+ assertEquals(0, partitionsRemaining.size)
+ activeServers = servers.filter(s => s.config.brokerId == 0)
+ partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+ leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ assertEquals(0, leaderAfterShutdown)
+
+ assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+ controller.shutdownBroker(0, controlledShutdownCallback)
+ partitionsRemaining = resultQueue.take().get
+ assertEquals(1, partitionsRemaining.size)
+ // leader doesn't change since all the replicas are shut down
+ assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
}
/**
@@ -414,6 +410,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val partitions = 3
val topic = "my-topic"
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ servers = Seq(server)
def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = {
val props = new Properties()
@@ -446,51 +443,45 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
}
}
- try {
- // create a topic with a few config overrides and check that they are applied
- val maxMessageSize = 1024
- val retentionMs = 1000 * 1000
- AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-
- //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
- checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
+ // create a topic with a few config overrides and check that they are applied
+ val maxMessageSize = 1024
+ val retentionMs = 1000 * 1000
+ AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
- //Update dynamically and all properties should be applied
- AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
- checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
+ //Update dynamically and all properties should be applied
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
- // now double the config values for the topic and check that it is applied
- val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
- AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
- checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
- // Verify that the same config can be read from ZK
- val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
- assertEquals(newConfig, configInZk)
+ // now double the config values for the topic and check that it is applied
+ val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
+ checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
- //Now delete the config
- AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
- checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+ // Verify that the same config can be read from ZK
+ val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
+ assertEquals(newConfig, configInZk)
- //Add config back
- AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
- checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
+ //Now delete the config
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+ checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
- //Now ensure updating to "" removes the throttled replica list also
- AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
- checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+ //Add config back
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
- } finally {
- server.shutdown()
- CoreUtils.delete(server.config.logDirs)
- }
+ //Now ensure updating to "" removes the throttled replica list also
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+ checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
}
@Test
def shouldPropagateDynamicBrokerConfigs() {
val brokerIds = Seq(0, 1, 2)
- val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
+ servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
def checkConfig(limit: Long) {
retry(10000) {
@@ -501,37 +492,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
}
}
- try {
- val limit: Long = 1000000
-
- // Set the limit & check it is applied to the log
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
- (LeaderReplicationThrottledRateProp, limit.toString),
- (FollowerReplicationThrottledRateProp, limit.toString)))
- checkConfig(limit)
-
- // Now double the config values for the topic and check that it is applied
- val newLimit = 2 * limit
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
- (LeaderReplicationThrottledRateProp, newLimit.toString),
- (FollowerReplicationThrottledRateProp, newLimit.toString)))
- checkConfig(newLimit)
-
- // Verify that the same config can be read from ZK
- for (brokerId <- brokerIds) {
- val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString)
- assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
- assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
- }
-
- //Now delete the config
- changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
- checkConfig(DefaultReplicationThrottledRate)
-
- } finally {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ val limit: Long = 1000000
+
+ // Set the limit & check it is applied to the log
+ changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ (LeaderReplicationThrottledRateProp, limit.toString),
+ (FollowerReplicationThrottledRateProp, limit.toString)))
+ checkConfig(limit)
+
+ // Now double the config values for the topic and check that it is applied
+ val newLimit = 2 * limit
+ changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ (LeaderReplicationThrottledRateProp, newLimit.toString),
+ (FollowerReplicationThrottledRateProp, newLimit.toString)))
+ checkConfig(newLimit)
+
+ // Verify that the same config can be read from ZK
+ for (brokerId <- brokerIds) {
+ val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString)
+ assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
+ assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
}
+
+ //Now delete the config
+ changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+ checkConfig(DefaultReplicationThrottledRate)
}
/**
@@ -556,13 +541,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
// Test that the existing clientId overrides are read
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
- try {
- assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
- assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
- } finally {
- server.shutdown()
- CoreUtils.delete(server.config.logDirs)
- }
+ servers = Seq(server)
+ assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 2085d2d..d9ab85e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
import kafka.utils.ZkUtils._
import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
import java.util.Properties
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
@@ -31,22 +31,29 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
class DeleteTopicTest extends ZooKeeperTestHarness {
+ var servers: Seq[KafkaServer] = Seq()
+
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
@Test
def testDeleteTopicWithAllAliveReplicas() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- servers.foreach(_.shutdown())
}
@Test
def testResumeDeleteTopicWithRecoveredFollower() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
// shut down one follower replica
val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
@@ -64,14 +71,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// restart follower replica
follower.startup()
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- servers.foreach(_.shutdown())
}
@Test
def testResumeDeleteTopicOnControllerFailover() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
val controllerId = zkUtils.getController()
val controller = servers.filter(s => s.config.brokerId == controllerId).head
val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
@@ -91,7 +97,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
follower.startup()
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- servers.foreach(_.shutdown())
}
@Test
@@ -103,6 +108,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+ this.servers = allServers
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
@@ -136,13 +142,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
follower.startup()
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- allServers.foreach(_.shutdown())
}
@Test
def testDeleteTopicDuringAddPartition() {
val topic = "test"
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
@@ -159,13 +164,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() =>
servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
"Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
- servers.foreach(_.shutdown())
}
@Test
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
// add partitions to topic
@@ -175,7 +179,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// verify that new partition doesn't exist on any broker either
assertTrue("Replica logs not deleted after delete topic is complete",
servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
- servers.foreach(_.shutdown())
}
@Test
@@ -183,7 +186,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
val topicPartition = new TopicPartition(topic, 0)
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -195,14 +198,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// check if all replica logs are created
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
"Replicas for topic test not created.")
- servers.foreach(_.shutdown())
}
@Test
def testDeleteNonExistingTopic() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
// start topic deletion
try {
AdminUtils.deleteTopic(zkUtils, "test2")
@@ -220,7 +222,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// topic test should have a leader
val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
- servers.foreach(_.shutdown())
}
@Test
@@ -236,7 +237,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
brokerConfigs.head.setProperty("log.segment.bytes","100")
brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
- val servers = createTestTopicAndCluster(topic,brokerConfigs)
+ servers = createTestTopicAndCluster(topic,brokerConfigs)
// for simplicity, we are validating cleaner offsets on a single broker
val server = servers.head
@@ -251,15 +252,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// delete topic
AdminUtils.deleteTopic(zkUtils, "test")
TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
-
- servers.foreach(_.shutdown())
}
@Test
def testDeleteTopicAlreadyMarkedAsDeleted() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic)
+ servers = createTestTopicAndCluster(topic)
try {
// start topic deletion
@@ -273,7 +272,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
}
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- servers.foreach(_.shutdown())
}
private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = {
@@ -311,7 +309,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
def testDisableDeleteTopic() {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
+ servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
// mark the topic for deletion
AdminUtils.deleteTopic(zkUtils, "test")
TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
@@ -323,6 +321,5 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// topic test should have a leader
val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
- servers.foreach(_.shutdown())
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 05a3f83..e3b0aa8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -17,7 +17,7 @@ import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Before, Test}
@@ -44,8 +44,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
@After
override def tearDown() {
- servers.par.foreach(_.shutdown())
- servers.par.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5e608d1..cbb98e8 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -20,7 +20,7 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before, Test}
@@ -35,8 +35,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
super.tearDown
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index af3133a..da25d5c 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -97,8 +97,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
@After
override def tearDown() {
if (servers != null) {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
}
super.tearDown
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index d63d5b2..07af590 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -24,7 +24,7 @@ import kafka.api.TopicMetadataResponse
import kafka.client.ClientUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
@@ -33,6 +33,7 @@ import org.junit.{Test, After, Before}
class TopicMetadataTest extends ZooKeeperTestHarness {
private var server1: KafkaServer = null
+ private var adHocServers: Seq[KafkaServer] = Seq()
var brokerEndPoints: Seq[BrokerEndPoint] = null
var adHocConfigs: Seq[KafkaConfig] = null
val numConfigs: Int = 4
@@ -53,7 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- server1.shutdown()
+ TestUtils.shutdownServers(adHocServers :+ server1)
super.tearDown()
}
@@ -134,6 +135,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
// start adHoc brokers with replication factor too high
val adHocServer = createServer(new KafkaConfig(adHocProps))
+ adHocServers = Seq(adHocServer)
// We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
// `securityProtocol` instead of PLAINTEXT below
val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
@@ -147,8 +149,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
- adHocServer.shutdown()
}
@Test
@@ -216,7 +216,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
val numBrokers = 2 //just 2 brokers are enough for the test
// start adHoc brokers
- val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
+ adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
// create topic
@@ -232,9 +232,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
// check metadata is still correct and updated at all brokers
checkIsr(allServers)
-
- // shutdown adHoc brokers
- adHocServers.map(p => p.shutdown())
}
private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
@@ -269,7 +266,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
@Test
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
- var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
+ adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
checkMetadata(adHocServers, numConfigs - 1)
@@ -277,13 +274,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
checkMetadata(adHocServers, numConfigs)
- adHocServers.map(p => p.shutdown())
}
@Test
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
- val adHocServers = adHocConfigs.map(p => createServer(p))
+ adHocServers = adHocConfigs.map(p => createServer(p))
checkMetadata(adHocServers, numConfigs)
@@ -292,7 +288,5 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
adHocServers.last.awaitShutdown()
checkMetadata(adHocServers, numConfigs - 1)
-
- adHocServers.map(p => p.shutdown())
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 63ec83e..1d3f77f 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -91,10 +91,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
if (consumer2 != null)
consumer2.close()
- server1.shutdown
- server2.shutdown
- CoreUtils.delete(server1.config.logDirs)
- CoreUtils.delete(server2.config.logDirs)
+ TestUtils.shutdownServers(Seq(server1, server2))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 8d4899b..6d9ab72 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -18,7 +18,7 @@
package kafka.server
import org.junit.Assert._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Test}
@@ -32,10 +32,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- servers.foreach { s =>
- s.shutdown()
- CoreUtils.delete(s.config.logDirs)
- }
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 6ffe314..aa243be 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import kafka.api.LeaderAndIsr
import org.apache.kafka.common.requests._
import org.junit.Assert._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext}
import kafka.utils.TestUtils._
@@ -60,8 +60,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 415027c..9383355 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -60,8 +60,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
simpleConsumer.close
- server.shutdown
- Utils.delete(logDir)
+ TestUtils.shutdownServers(Seq(server))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 54cee6b..0ecc3c7 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -27,7 +27,6 @@ import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
-import org.apache.kafka.common.utils.Utils
import org.junit.{After, Before, Test}
import org.junit.Assert._
@@ -95,10 +94,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
producer.close()
- for (server <- servers) {
- server.shutdown()
- Utils.delete(new File(server.config.logDirs.head))
- }
+ TestUtils.shutdownServers(servers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index ebfbe89..244ef78 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -68,8 +68,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
simpleConsumer.close
- server.shutdown
- Utils.delete(logDir)
+ TestUtils.shutdownServers(Seq(server))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 19c386f..dd683e1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -42,7 +42,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
- brokers.foreach(_.shutdown())
+ TestUtils.shutdownServers(brokers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 15e77a0..5fc4c0f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -50,15 +50,10 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val topic = "topic1"
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
- @Before
- override def setUp() {
- super.setUp()
- }
-
@After
override def tearDown() {
- brokers.par.foreach(_.shutdown())
producer.close()
+ shutdownServers(brokers)
super.tearDown()
}
@@ -242,4 +237,4 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val metricName = broker.metrics.metricName("byte-rate", repType.toString)
broker.metrics.metrics.asScala(metricName).value
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index f21f2de..0ba133f 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -19,8 +19,8 @@ package kafka.server
import java.util.Properties
import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, CoreUtils}
-import org.junit.{Before, Test}
+import kafka.utils.TestUtils
+import org.junit.{After, Before, Test}
import org.junit.Assert._
import java.io.File
@@ -30,6 +30,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
var props2: Properties = null
var config2: KafkaConfig = null
val brokerMetaPropsFile = "meta.properties"
+ var servers: Seq[KafkaServer] = Seq()
@Before
override def setUp() {
@@ -40,6 +41,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
config2 = KafkaConfig.fromProps(props2)
}
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
@Test
def testAutoGenerateBrokerId() {
var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
@@ -47,11 +54,10 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
// restart the server check to see if it uses the brokerId generated previously
- server1 = new KafkaServer(config1)
- server1.startup()
+ server1 = TestUtils.createServer(config1)
+ servers = Seq(server1)
assertEquals(server1.config.brokerId, 1001)
server1.shutdown()
- CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -61,23 +67,18 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
val server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
val server2 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName))
val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
- val config3 = KafkaConfig.fromProps(props3)
- val server3 = new KafkaServer(config3)
+ val server3 = new KafkaServer(KafkaConfig.fromProps(props3))
server1.startup()
- assertEquals(server1.config.brokerId,1001)
+ assertEquals(server1.config.brokerId, 1001)
server2.startup()
- assertEquals(server2.config.brokerId,0)
+ assertEquals(server2.config.brokerId, 0)
server3.startup()
- assertEquals(server3.config.brokerId,1002)
- server1.shutdown()
- server2.shutdown()
- server3.shutdown()
- assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
- assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
- assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
- CoreUtils.delete(server1.config.logDirs)
- CoreUtils.delete(server2.config.logDirs)
- CoreUtils.delete(server3.config.logDirs)
+ assertEquals(server3.config.brokerId, 1002)
+ servers = Seq(server1, server2, server3)
+ servers.foreach(_.shutdown())
+ assertTrue(verifyBrokerMetadata(server1.config.logDirs, 1001))
+ assertTrue(verifyBrokerMetadata(server2.config.logDirs, 0))
+ assertTrue(verifyBrokerMetadata(server3.config.logDirs, 1002))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -88,12 +89,11 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting
props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
val config3 = KafkaConfig.fromProps(props3)
- val server3 = new KafkaServer(config3)
- server3.startup()
- assertEquals(server3.config.brokerId,3)
+ val server3 = TestUtils.createServer(config3)
+ servers = Seq(server3)
+ assertEquals(server3.config.brokerId, 3)
server3.shutdown()
- assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
- CoreUtils.delete(server3.config.logDirs)
+ assertTrue(verifyBrokerMetadata(server3.config.logDirs, 3))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -102,21 +102,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// add multiple logDirs and check if the generate brokerId is stored in all of them
val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
"," + TestUtils.tempDir().getAbsolutePath
- props1.setProperty("log.dir",logDirs)
+ props1.setProperty("log.dir", logDirs)
config1 = KafkaConfig.fromProps(props1)
var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
server1.startup()
+ servers = Seq(server1)
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
// addition to log.dirs after generation of a broker.id from zk should be copied over
val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
- props1.setProperty("log.dir",newLogDirs)
+ props1.setProperty("log.dir", newLogDirs)
config1 = KafkaConfig.fromProps(props1)
server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
server1.startup()
+ servers = Seq(server1)
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
- CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -125,6 +126,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) //auto generate broker Id
server1.startup()
+ servers = Seq(server1)
server1.shutdown()
server1 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName)) // user specified broker id
try {
@@ -133,7 +135,6 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
case _: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
- CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -142,8 +143,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// Start a good server
val propsA = TestUtils.createBrokerConfig(1, zkConnect)
val configA = KafkaConfig.fromProps(propsA)
- val serverA = new KafkaServer(configA)
- serverA.startup()
+ val serverA = TestUtils.createServer(configA)
// Start a server that collides on the broker id
val propsB = TestUtils.createBrokerConfig(1, zkConnect)
@@ -152,6 +152,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
intercept[RuntimeException] {
serverB.startup()
}
+ servers = Seq(serverA)
// verify no broker metadata was written
serverB.config.logDirs.foreach { logDir =>
@@ -162,26 +163,25 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// adjust the broker config and start again
propsB.setProperty(KafkaConfig.BrokerIdProp, "2")
val newConfigB = KafkaConfig.fromProps(propsB)
- val newServerB = new KafkaServer(newConfigB)
- newServerB.startup()
+ val newServerB = TestUtils.createServer(newConfigB)
+ servers = Seq(serverA, newServerB)
serverA.shutdown()
newServerB.shutdown()
+
// verify correct broker metadata was written
- assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1))
- assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2))
- CoreUtils.delete(serverA.config.logDirs)
- CoreUtils.delete(newServerB.config.logDirs)
+ assertTrue(verifyBrokerMetadata(serverA.config.logDirs, 1))
+ assertTrue(verifyBrokerMetadata(newServerB.config.logDirs, 2))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
- for(logDir <- logDirs) {
+ for (logDir <- logDirs) {
val brokerMetadataOpt = new BrokerMetadataCheckpoint(
new File(logDir + File.separator + brokerMetaPropsFile)).read()
brokerMetadataOpt match {
- case Some(brokerMetadata: BrokerMetadata) =>
- if (brokerMetadata.brokerId != brokerId) return false
+ case Some(brokerMetadata) =>
+ if (brokerMetadata.brokerId != brokerId) return false
case _ => return false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 325889f..1ec80fa 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -19,16 +19,17 @@ package kafka.server
import scala.concurrent._
import ExecutionContext.Implicits._
import scala.concurrent.duration._
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.{Before, After, Test}
import org.apache.kafka.test.TestUtils.isValidClusterId
class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
var config1: KafkaConfig = null
var config2: KafkaConfig = null
var config3: KafkaConfig = null
+ var servers: Seq[KafkaServer] = Seq()
@Before
override def setUp() {
@@ -38,12 +39,20 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect))
}
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
+
@Test
def testAutoGenerateClusterId() {
// Make sure that the cluster id doesn't exist yet.
assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath))
var server1 = TestUtils.createServer(config1)
+ servers = Seq(server1)
// Validate the cluster id
val clusterIdOnFirstBoot = server1.clusterId
@@ -56,8 +65,8 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
// Restart the server check to confirm that it uses the clusterId generated previously
- server1 = new KafkaServer(config1)
- server1.startup()
+ server1 = TestUtils.createServer(config1)
+ servers = Seq(server1)
val clusterIdOnSecondBoot = server1.clusterId
assertEquals(clusterIdOnFirstBoot, clusterIdOnSecondBoot)
@@ -68,7 +77,6 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
- CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -82,10 +90,9 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
val server3 = TestUtils.createServer(config3)
val clusterIdFromServer3 = server3.clusterId
+ servers = Seq(server1, server2, server3)
- server1.shutdown()
- server2.shutdown()
- server3.shutdown()
+ servers.foreach(_.shutdown())
isValidClusterId(clusterIdFromServer1)
assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3)
@@ -97,28 +104,23 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
assertEquals(clusterIdFromServer2, server2.clusterId)
server3.startup()
assertEquals(clusterIdFromServer3, server3.clusterId)
- server1.shutdown()
- server2.shutdown()
- server3.shutdown()
- CoreUtils.delete(server1.config.logDirs)
- CoreUtils.delete(server2.config.logDirs)
- CoreUtils.delete(server3.config.logDirs)
+ servers.foreach(_.shutdown())
+
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@Test
def testAutoGenerateClusterIdForKafkaClusterParallel() {
val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config)))
- val Seq(server1, server2, server3) = Await.result(firstBoot, 100 second)
+ servers = Await.result(firstBoot, 100 second)
+ val Seq(server1, server2, server3) = servers
val clusterIdFromServer1 = server1.clusterId
val clusterIdFromServer2 = server2.clusterId
val clusterIdFromServer3 = server3.clusterId
- server1.shutdown()
- server2.shutdown()
- server3.shutdown()
+ servers.foreach(_.shutdown())
isValidClusterId(clusterIdFromServer1)
assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3)
@@ -127,13 +129,11 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server.startup()
server
})
- val servers = Await.result(secondBoot, 100 second)
+ servers = Await.result(secondBoot, 100 second)
servers.foreach(server => assertEquals(clusterIdFromServer1, server.clusterId))
servers.foreach(_.shutdown())
- CoreUtils.delete(server1.config.logDirs)
- CoreUtils.delete(server2.config.logDirs)
- CoreUtils.delete(server3.config.logDirs)
+
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index ac757d0..a25569f 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,14 +17,23 @@
package kafka.server
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
class ServerStartupTest extends ZooKeeperTestHarness {
+ private var server: KafkaServer = null
+
+ @After
+ override def tearDown() {
+ if (server != null)
+ TestUtils.shutdownServers(Seq(server))
+ super.tearDown()
+ }
+
@Test
def testBrokerCreatesZKChroot {
val brokerId = 0
@@ -32,13 +41,10 @@ class ServerStartupTest extends ZooKeeperTestHarness {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
- val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+ server = TestUtils.createServer(KafkaConfig.fromProps(props))
val pathExists = zkUtils.pathExists(zookeeperChroot)
assertTrue(pathExists)
-
- server.shutdown()
- CoreUtils.delete(server.config.logDirs)
}
@Test
@@ -46,8 +52,8 @@ class ServerStartupTest extends ZooKeeperTestHarness {
// Create and start first broker
val brokerId1 = 0
val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
- val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
- val port = TestUtils.boundPort(server1)
+ server = TestUtils.createServer(KafkaConfig.fromProps(props1))
+ val port = TestUtils.boundPort(server)
// Create a second broker with same port
val brokerId2 = 1
@@ -57,9 +63,6 @@ class ServerStartupTest extends ZooKeeperTestHarness {
fail("Starting a broker with the same port should fail")
} catch {
case _: RuntimeException => // expected
- } finally {
- server1.shutdown()
- CoreUtils.delete(server1.config.logDirs)
}
}
@@ -70,7 +73,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
val brokerId = 0
val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
- val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
+ server = TestUtils.createServer(KafkaConfig.fromProps(props1))
val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1
val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
@@ -84,23 +87,17 @@ class ServerStartupTest extends ZooKeeperTestHarness {
// broker registration shouldn't change
assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
-
- server1.shutdown()
- CoreUtils.delete(server1.config.logDirs)
}
@Test
def testBrokerSelfAware {
val brokerId = 0
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+ server = TestUtils.createServer(KafkaConfig.fromProps(props))
TestUtils.waitUntilTrue(() => server.metadataCache.getAliveBrokers.nonEmpty, "Wait for cache to update")
assertEquals(1, server.metadataCache.getAliveBrokers.size)
assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id)
-
- server.shutdown()
- CoreUtils.delete(server.config.logDirs)
}
@Test
@@ -119,13 +116,11 @@ class ServerStartupTest extends ZooKeeperTestHarness {
class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- val server = new MockKafkaServer(KafkaConfig.fromProps(props))
+ server = new MockKafkaServer(KafkaConfig.fromProps(props))
EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
EasyMock.replay(mockBrokerState)
server.startup()
- server.shutdown()
- CoreUtils.delete(server.config.logDirs)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 4edfbaf..182e904 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -66,8 +66,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
@After
override def tearDown() {
- brokers.par.foreach(_.shutdown())
producer.close()
+ TestUtils.shutdownServers(brokers)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index c5bb5e4..f7110ee 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.{After, Test}
import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import scala.collection.JavaConverters._
@@ -51,23 +51,18 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
val tp = t1p0
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
- @Before
- override def setUp() {
- super.setUp()
- val props = createBrokerConfigs(2, zkConnect)
- brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
- }
-
@After
override def tearDown() {
- brokers.foreach(_.shutdown())
if (producer != null)
producer.close()
+ TestUtils.shutdownServers(brokers)
super.tearDown()
}
@Test
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
+ brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+
// Given two topics with replication of a single partition
for (topic <- List(topic1, topic2)) {
createTopic(zkUtils, topic, Map(0 -> Seq(0, 1)), servers = brokers)
@@ -280,4 +275,4 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 01ff83d..a51a07c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -174,6 +174,16 @@ object TestUtils extends Logging {
}
/**
+ * Shutdown `servers` and delete their log directories.
+ */
+ def shutdownServers(servers: Seq[KafkaServer]) {
+ servers.par.foreach { s =>
+ s.shutdown()
+ CoreUtils.delete(s.config.logDirs)
+ }
+ }
+
+ /**
* Create a test config for the provided parameters.
*
* Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
@@ -939,9 +949,10 @@ object TestUtils extends Logging {
}
def verifyNonDaemonThreadsStatus(threadNamePrefix: String) {
- assertEquals(0, Thread.getAllStackTraces.keySet().toArray
- .map(_.asInstanceOf[Thread])
- .count(t => !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)))
+ val threadCount = Thread.getAllStackTraces.keySet.asScala.count { t =>
+ !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+ }
+ assertEquals(0, threadCount)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 2805b3b..b3b10f3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -21,11 +21,14 @@ import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, ZkUtils}
import org.junit.{After, Before}
+import org.junit.Assert.assertEquals
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
+import scala.collection.JavaConverters._
+
@Category(Array(classOf[IntegrationTest]))
abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@@ -41,6 +44,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
+ assertNoBrokerControllersRunning()
zookeeper = new EmbeddedZookeeper()
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
}
@@ -52,6 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown())
Configuration.setConfiguration(null)
+ assertNoBrokerControllersRunning()
}
+ // Tests using this class start ZooKeeper before starting any brokers and shutdown ZK after
+ // shutting down brokers. If tests leave broker controllers running, subsequent tests may fail in
+ // unexpected ways if ZK port is reused. This method ensures that there is no Controller event thread
+ // since the controller loads default JAAS configuration to make connections to brokers on this thread.
+ //
+ // Any tests that use this class and invoke ZooKeeperTestHarness#tearDown() will fail in the tearDown()
+ // if controller event thread is found. Tests with missing broker shutdown which don't use ZooKeeperTestHarness
+ // or its tearDown() will cause an assertion failure in the subsequent test that invokes ZooKeeperTestHarness#setUp(),
+ // making it easier to identify the test with missing shutdown from the test sequence.
+ private def assertNoBrokerControllersRunning() {
+ val threads = Thread.getAllStackTraces.keySet.asScala
+ .map(_.getName)
+ .filter(_.contains("controller-event-thread"))
+ assertEquals(Set(), threads)
+ }
}