You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/11/11 10:50:47 UTC
[kafka] branch trunk updated: KAFKA-10693: Close quota managers
created in tests (#9573)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad7390f KAFKA-10693: Close quota managers created in tests (#9573)
ad7390f is described below
commit ad7390f079fd5809672389d7e0b4c25ea2162c23
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Wed Nov 11 04:47:40 2020 -0600
KAFKA-10693: Close quota managers created in tests (#9573)
Reviewers: David Jacot <dj...@confluent.io>
---
.../server/HighwatermarkPersistenceTest.scala | 8 +++--
.../unit/kafka/server/IsrExpirationTest.scala | 8 +++--
.../kafka/server/ReplicaManagerQuotasTest.scala | 9 ++++--
.../unit/kafka/server/ReplicaManagerTest.scala | 37 +++++++++++-----------
.../server/epoch/OffsetsForLeaderEpochTest.scala | 29 +++++++++++++----
5 files changed, 59 insertions(+), 32 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index dd8c04f..95c7671 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -65,9 +65,10 @@ class HighwatermarkPersistenceTest {
scheduler.startup()
val metrics = new Metrics
val time = new MockTime
+ val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler,
- logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+ logManagers.head, new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager)
replicaManager.startup()
try {
@@ -99,6 +100,7 @@ class HighwatermarkPersistenceTest {
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
+ quotaManager.shutdown()
metrics.close()
scheduler.shutdown()
}
@@ -115,9 +117,10 @@ class HighwatermarkPersistenceTest {
scheduler.startup()
val metrics = new Metrics
val time = new MockTime
+ val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
- scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+ scheduler, logManagers.head, new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager)
replicaManager.startup()
try {
@@ -169,6 +172,7 @@ class HighwatermarkPersistenceTest {
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
+ quotaManager.shutdown()
metrics.close()
scheduler.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 6a501f7..7fc4903 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Partition
import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.TestUtils.MockAlterIsrManager
import kafka.utils._
import org.apache.kafka.common.TopicPartition
@@ -51,6 +52,7 @@ class IsrExpirationTest {
val time = new MockTime
val metrics = new Metrics
+ var quotaManager: QuotaManagers = null
var replicaManager: ReplicaManager = null
var alterIsrManager: MockAlterIsrManager = _
@@ -62,14 +64,16 @@ class IsrExpirationTest {
EasyMock.replay(logManager)
alterIsrManager = TestUtils.createAlterIsrManager()
+ quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+ quotaManager, new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager)
}
@After
def tearDown(): Unit = {
- replicaManager.shutdown(false)
+ Option(replicaManager).foreach(_.shutdown(false))
+ Option(quotaManager).foreach(_.shutdown())
metrics.close()
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 76cc470..25ee84e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.easymock.EasyMock
import EasyMock._
+import kafka.server.QuotaFactory.QuotaManagers
import org.junit.Assert._
import org.junit.{After, Test}
@@ -45,6 +46,7 @@ class ReplicaManagerQuotasTest {
val fetchInfo = Seq(
topicPartition1 -> new PartitionData(0, 0, 100, Optional.empty()),
topicPartition2 -> new PartitionData(0, 0, 100, Optional.empty()))
+ var quotaManager: QuotaManagers = _
var replicaManager: ReplicaManager = _
@Test
@@ -240,8 +242,9 @@ class ReplicaManagerQuotasTest {
val alterIsrManager: AlterIsrManager = createMock(classOf[AlterIsrManager])
val leaderBrokerId = configs.head.brokerId
+ quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+ new AtomicBoolean(false), quotaManager,
new BrokerTopicStats, new MetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager)
//create the two replicas
@@ -262,8 +265,8 @@ class ReplicaManagerQuotasTest {
@After
def tearDown(): Unit = {
- if (replicaManager != null)
- replicaManager.shutdown(false)
+ Option(replicaManager).foreach(_.shutdown(false))
+ Option(quotaManager).foreach(_.shutdown())
metrics.close()
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6a0f94c..761b58f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -27,7 +27,7 @@ import kafka.api._
import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.LeaderOffsetIncremented
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
@@ -66,6 +66,8 @@ class ReplicaManagerTest {
val metrics = new Metrics
var kafkaZkClient: KafkaZkClient = _
var alterIsrManager: AlterIsrManager = _
+ var config: KafkaConfig = _
+ var quotaManager: QuotaManagers = _
// Constants defined for readability
val zkVersion = 0
@@ -79,22 +81,24 @@ class ReplicaManagerTest {
EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes()
EasyMock.replay(kafkaZkClient)
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ config = KafkaConfig.fromProps(props)
alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager])
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
}
@After
def tearDown(): Unit = {
TestUtils.clearYammerMetrics()
+ Option(quotaManager).foreach(_.shutdown())
metrics.close()
}
@Test
def testHighWaterMarkDirectoryMapping(): Unit = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
- val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
@@ -114,7 +118,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
try {
val partition = rm.createPartition(new TopicPartition(topic, 1))
@@ -129,11 +133,9 @@ class ReplicaManagerTest {
@Test
def testIllegalRequiredAcks(): Unit = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
- val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, Option(this.getClass.getName))
try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
@@ -166,7 +168,7 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
try {
@@ -1506,23 +1508,22 @@ class ReplicaManagerTest {
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
// Mock network client to show leader offset of 5
- val quota = QuotaFactory.instantiate(config, metrics, time, "")
val blockingSend = new ReplicaFetcherMockBlockingSend(Map(topicPartitionObj ->
new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, BrokerEndPoint(1, "host1" ,1), time)
val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr,
- new AtomicBoolean(false), quota, mockBrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, mockBrokerTopicStats,
metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager) {
override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time,
threadNamePrefix: Option[String],
- quotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
+ replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
+ new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", fetcherId,
- sourceBroker, config, failedPartitions, replicaManager, metrics, time, quota.follower, Some(blockingSend)) {
+ sourceBroker, config, failedPartitions, replicaManager, metrics, time, quotaManager.follower, Some(blockingSend)) {
override def doWork() = {
// In case the thread starts before the partition is added by AbstractFetcherManager,
@@ -1684,7 +1685,7 @@ class ReplicaManagerTest {
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager)
}
@@ -1892,10 +1893,10 @@ class ReplicaManagerTest {
// each replica manager is for a broker
val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0,
- new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""),
+ new AtomicBoolean(false), quotaManager,
brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1,
- new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""),
+ new AtomicBoolean(false), quotaManager,
brokerTopicStats2, metadataCache1, new LogDirFailureChannel(config1.logDirs.size), alterIsrManager)
(rm0, rm1)
@@ -2135,7 +2136,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) {
override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 68f2252..d080c64 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -21,6 +21,7 @@ import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
@@ -30,7 +31,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
import org.apache.kafka.common.requests.EpochEndOffset._
import org.easymock.EasyMock._
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
class OffsetsForLeaderEpochTest {
private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head
@@ -38,6 +39,13 @@ class OffsetsForLeaderEpochTest {
private val metrics = new Metrics
private val alterIsrManager = TestUtils.createAlterIsrManager()
private val tp = new TopicPartition("topic", 1)
+ private var replicaManager: ReplicaManager = _
+ private var quotaManager: QuotaManagers = _
+
+ @Before
+ def setUp(): Unit = {
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+ }
@Test
def shouldGetEpochsFromReplica(): Unit = {
@@ -54,8 +62,8 @@ class OffsetsForLeaderEpochTest {
replay(mockLog, logManager)
// create a replica manager with 1 partition that has 1 replica
- val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+ quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
val partition = replicaManager.createPartition(tp)
partition.setLog(mockLog, isFutureLog = false)
@@ -75,8 +83,8 @@ class OffsetsForLeaderEpochTest {
replay(logManager)
//create a replica manager with 1 partition that has 0 replica
- val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+ quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
replicaManager.createPartition(tp)
@@ -98,8 +106,8 @@ class OffsetsForLeaderEpochTest {
replay(logManager)
//create a replica manager with 0 partition
- val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+ quotaManager, new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
//Given
@@ -112,4 +120,11 @@ class OffsetsForLeaderEpochTest {
//Then
assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
}
+
+ @After
+ def tearDown(): Unit = {
+ Option(replicaManager).foreach(_.shutdown(checkpointHW = false))
+ Option(quotaManager).foreach(_.shutdown())
+ metrics.close()
+ }
}