You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/11/11 10:50:47 UTC

[kafka] branch trunk updated: KAFKA-10693: Close quota managers created in tests (#9573)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad7390f  KAFKA-10693: Close quota managers created in tests (#9573)
ad7390f is described below

commit ad7390f079fd5809672389d7e0b4c25ea2162c23
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Wed Nov 11 04:47:40 2020 -0600

    KAFKA-10693: Close quota managers created in tests (#9573)
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../server/HighwatermarkPersistenceTest.scala      |  8 +++--
 .../unit/kafka/server/IsrExpirationTest.scala      |  8 +++--
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  9 ++++--
 .../unit/kafka/server/ReplicaManagerTest.scala     | 37 +++++++++++-----------
 .../server/epoch/OffsetsForLeaderEpochTest.scala   | 29 +++++++++++++----
 5 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index dd8c04f..95c7671 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -65,9 +65,10 @@ class HighwatermarkPersistenceTest {
     scheduler.startup()
     val metrics = new Metrics
     val time = new MockTime
+    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler,
-      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+      logManagers.head, new AtomicBoolean(false), quotaManager,
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager)
     replicaManager.startup()
     try {
@@ -99,6 +100,7 @@ class HighwatermarkPersistenceTest {
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)
+      quotaManager.shutdown()
       metrics.close()
       scheduler.shutdown()
     }
@@ -115,9 +117,10 @@ class HighwatermarkPersistenceTest {
     scheduler.startup()
     val metrics = new Metrics
     val time = new MockTime
+    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
-      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+      scheduler, logManagers.head, new AtomicBoolean(false), quotaManager,
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head, alterIsrManager)
     replicaManager.startup()
     try {
@@ -169,6 +172,7 @@ class HighwatermarkPersistenceTest {
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)
+      quotaManager.shutdown()
       metrics.close()
       scheduler.shutdown()
     }
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 6a501f7..7fc4903 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Partition
 import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.TestUtils.MockAlterIsrManager
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -51,6 +52,7 @@ class IsrExpirationTest {
   val time = new MockTime
   val metrics = new Metrics
 
+  var quotaManager: QuotaManagers = null
   var replicaManager: ReplicaManager = null
 
   var alterIsrManager: MockAlterIsrManager = _
@@ -62,14 +64,16 @@ class IsrExpirationTest {
     EasyMock.replay(logManager)
 
     alterIsrManager = TestUtils.createAlterIsrManager()
+    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
     replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+      quotaManager, new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
       new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager)
   }
 
   @After
   def tearDown(): Unit = {
-    replicaManager.shutdown(false)
+    Option(replicaManager).foreach(_.shutdown(false))
+    Option(quotaManager).foreach(_.shutdown())
     metrics.close()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 76cc470..25ee84e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import EasyMock._
+import kafka.server.QuotaFactory.QuotaManagers
 import org.junit.Assert._
 import org.junit.{After, Test}
 
@@ -45,6 +46,7 @@ class ReplicaManagerQuotasTest {
   val fetchInfo = Seq(
     topicPartition1 -> new PartitionData(0, 0, 100, Optional.empty()),
     topicPartition2 -> new PartitionData(0, 0, 100, Optional.empty()))
+  var quotaManager: QuotaManagers = _
   var replicaManager: ReplicaManager = _
 
   @Test
@@ -240,8 +242,9 @@ class ReplicaManagerQuotasTest {
     val alterIsrManager: AlterIsrManager = createMock(classOf[AlterIsrManager])
 
     val leaderBrokerId = configs.head.brokerId
+    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
+      new AtomicBoolean(false), quotaManager,
       new BrokerTopicStats, new MetadataCache(leaderBrokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager)
 
     //create the two replicas
@@ -262,8 +265,8 @@ class ReplicaManagerQuotasTest {
 
   @After
   def tearDown(): Unit = {
-    if (replicaManager != null)
-      replicaManager.shutdown(false)
+    Option(replicaManager).foreach(_.shutdown(false))
+    Option(quotaManager).foreach(_.shutdown())
     metrics.close()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6a0f94c..761b58f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -27,7 +27,7 @@ import kafka.api._
 import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.log.LeaderOffsetIncremented
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
@@ -66,6 +66,8 @@ class ReplicaManagerTest {
   val metrics = new Metrics
   var kafkaZkClient: KafkaZkClient = _
   var alterIsrManager: AlterIsrManager = _
+  var config: KafkaConfig = _
+  var quotaManager: QuotaManagers = _
 
   // Constants defined for readability
   val zkVersion = 0
@@ -79,22 +81,24 @@ class ReplicaManagerTest {
     EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes()
     EasyMock.replay(kafkaZkClient)
 
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    config = KafkaConfig.fromProps(props)
     alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager])
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
   }
 
   @After
   def tearDown(): Unit = {
     TestUtils.clearYammerMetrics()
+    Option(quotaManager).foreach(_.shutdown())
     metrics.close()
   }
 
   @Test
   def testHighWaterMarkDirectoryMapping(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
-    val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
@@ -114,7 +118,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
@@ -129,11 +133,9 @@ class ReplicaManagerTest {
 
   @Test
   def testIllegalRequiredAcks(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
-    val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager, Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
@@ -166,7 +168,7 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
 
     try {
@@ -1506,23 +1508,22 @@ class ReplicaManagerTest {
       purgatoryName = "ElectLeader", timer, reaperEnabled = false)
 
     // Mock network client to show leader offset of 5
-    val quota = QuotaFactory.instantiate(config, metrics, time, "")
     val blockingSend = new ReplicaFetcherMockBlockingSend(Map(topicPartitionObj ->
       new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, BrokerEndPoint(1, "host1" ,1), time)
     val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr,
-      new AtomicBoolean(false), quota, mockBrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, mockBrokerTopicStats,
       metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager) {
 
       override protected def createReplicaFetcherManager(metrics: Metrics,
                                                      time: Time,
                                                      threadNamePrefix: Option[String],
-                                                     quotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
-        new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
+                                                     replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
+        new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
 
           override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
             new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", fetcherId,
-              sourceBroker, config, failedPartitions, replicaManager, metrics, time, quota.follower, Some(blockingSend)) {
+              sourceBroker, config, failedPartitions, replicaManager, metrics, time, quotaManager.follower, Some(blockingSend)) {
 
               override def doWork() = {
                 // In case the thread starts before the partition is added by AbstractFetcherManager,
@@ -1684,7 +1685,7 @@ class ReplicaManagerTest {
       purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
 
     new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+      new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager)
   }
@@ -1892,10 +1893,10 @@ class ReplicaManagerTest {
 
     // each replica manager is for a broker
     val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""),
+      new AtomicBoolean(false), quotaManager,
       brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
     val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""),
+      new AtomicBoolean(false), quotaManager,
       brokerTopicStats2, metadataCache1, new LogDirFailureChannel(config1.logDirs.size), alterIsrManager)
 
     (rm0, rm1)
@@ -2135,7 +2136,7 @@ class ReplicaManagerTest {
       val config = KafkaConfig.fromProps(props)
       val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
       new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
-        new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+        new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
         new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager) {
         override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
           throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 68f2252..d080c64 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -21,6 +21,7 @@ import java.util.Optional
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -30,7 +31,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.easymock.EasyMock._
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
 
 class OffsetsForLeaderEpochTest {
   private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head
@@ -38,6 +39,13 @@ class OffsetsForLeaderEpochTest {
   private val metrics = new Metrics
   private val alterIsrManager = TestUtils.createAlterIsrManager()
   private val tp = new TopicPartition("topic", 1)
+  private var replicaManager: ReplicaManager = _
+  private var quotaManager: QuotaManagers = _
+
+  @Before
+  def setUp(): Unit = {
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+  }
 
   @Test
   def shouldGetEpochsFromReplica(): Unit = {
@@ -54,8 +62,8 @@ class OffsetsForLeaderEpochTest {
     replay(mockLog, logManager)
 
     // create a replica manager with 1 partition that has 1 replica
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+    replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+      quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
     val partition = replicaManager.createPartition(tp)
     partition.setLog(mockLog, isFutureLog = false)
@@ -75,8 +83,8 @@ class OffsetsForLeaderEpochTest {
     replay(logManager)
 
     //create a replica manager with 1 partition that has 0 replica
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+    replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+      quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
     replicaManager.createPartition(tp)
 
@@ -98,8 +106,8 @@ class OffsetsForLeaderEpochTest {
     replay(logManager)
 
     //create a replica manager with 0 partition
-    val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+    replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
+      quotaManager, new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), alterIsrManager)
 
     //Given
@@ -112,4 +120,11 @@ class OffsetsForLeaderEpochTest {
     //Then
     assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
   }
+
+  @After
+  def tearDown(): Unit = {
+    Option(replicaManager).foreach(_.shutdown(checkpointHW = false))
+    Option(quotaManager).foreach(_.shutdown())
+    metrics.close()
+  }
 }