You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/11 22:32:06 UTC

[kafka] branch trunk updated: KAFKA-8782: Close metrics in QuotaManagerTests (#7191)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 558bb1d  KAFKA-8782: Close metrics in QuotaManagerTests (#7191)
558bb1d is described below

commit 558bb1d0692b3ba6cf9c46d7170be6d31656024b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Aug 11 15:31:42 2019 -0700

    KAFKA-8782: Close metrics in QuotaManagerTests (#7191)
    
    Since `Metrics` was constructed with `enableExpiration=false`, this was
    not a source of flakiness given the current implementation. This could
    change in the future, so good to follow the class contract.
    
    Included a few clean-ups with regards to redundant casts and type parameters
    as well as usage of try with resources for inline usage of `Metrics`.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../org/apache/kafka/common/metrics/Metrics.java   |  6 ++--
 .../apache/kafka/common/metrics/MetricsTest.java   |  7 ++---
 .../apache/kafka/common/metrics/SensorTest.java    | 36 ++++++++++------------
 .../common/metrics/stats/FrequenciesTest.java      |  3 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 28 ++++++-----------
 .../kafka/server/ReplicationQuotaManagerTest.scala | 17 +++++-----
 6 files changed, 43 insertions(+), 54 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 91c245d..e512e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -90,7 +90,7 @@ public class Metrics implements Closeable {
      * Expiration of Sensors is disabled.
      */
     public Metrics(Time time) {
-        this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
+        this(new MetricConfig(), new ArrayList<>(0), time);
     }
 
     /**
@@ -98,7 +98,7 @@ public class Metrics implements Closeable {
      * Expiration of Sensors is disabled.
      */
     public Metrics(MetricConfig defaultConfig, Time time) {
-        this(defaultConfig, new ArrayList<MetricsReporter>(0), time);
+        this(defaultConfig, new ArrayList<>(0), time);
     }
 
 
@@ -108,7 +108,7 @@ public class Metrics implements Closeable {
      * @param defaultConfig The default config to use for all metrics that don't override their config
      */
     public Metrics(MetricConfig defaultConfig) {
-        this(defaultConfig, new ArrayList<MetricsReporter>(0), Time.SYSTEM);
+        this(defaultConfig, new ArrayList<>(0), Time.SYSTEM);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 0b1ea85..0357045 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -63,7 +63,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("deprecation")
 public class MetricsTest {
     private static final Logger log = LoggerFactory.getLogger(MetricsTest.class);
 
@@ -75,7 +74,7 @@ public class MetricsTest {
 
     @Before
     public void setup() {
-        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+        this.metrics = new Metrics(config, Arrays.asList(new JmxReporter()), time, true);
     }
 
     @After
@@ -631,7 +630,7 @@ public class MetricsTest {
         Map<String, String> childTagsWithValues = new HashMap<>();
         childTagsWithValues.put("child-tag", "child-tag-value");
 
-        try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) {
+        try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList(new JmxReporter()), time, true)) {
             MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
 
             Map<String, String> filledOutTags = inheritedMetric.tags();
@@ -731,7 +730,7 @@ public class MetricsTest {
 
         final LockingReporter reporter = new LockingReporter();
         this.metrics.close();
-        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true);
+        this.metrics = new Metrics(config, Arrays.asList(reporter), new MockTime(10), true);
         final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
         SensorCreator sensorCreator = new SensorCreator(metrics);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index fc7cfe2..d1f3a91 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -86,25 +86,23 @@ public class SensorTest {
     public void testExpiredSensor() {
         MetricConfig config = new MetricConfig();
         Time mockTime = new MockTime();
-        Metrics metrics =  new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), mockTime, true);
-
-        long inactiveSensorExpirationTimeSeconds = 60L;
-        Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
-            inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
-
-        assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
-
-        Map<String, String> emptyTags = Collections.emptyMap();
-        MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
-        MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
-        Meter meter = new Meter(rateMetricName, totalMetricName);
-        assertTrue(sensor.add(meter));
-
-        mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
-        assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
-        assertFalse(sensor.add(meter));
-
-        metrics.close();
+        try (Metrics metrics = new Metrics(config, Arrays.asList(new JmxReporter()), mockTime, true)) {
+            long inactiveSensorExpirationTimeSeconds = 60L;
+            Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
+                    inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
+
+            assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
+
+            Map<String, String> emptyTags = Collections.emptyMap();
+            MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
+            MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
+            Meter meter = new Meter(rateMetricName, totalMetricName);
+            assertTrue(sensor.add(meter));
+
+            mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
+            assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
+            assertFalse(sensor.add(meter));
+        }
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index e3e623f..bf7647a 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -46,7 +45,7 @@ public class FrequenciesTest {
     public void setup() {
         config = new MetricConfig().eventWindow(50).samples(2);
         time = new MockTime();
-        metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+        metrics = new Metrics(config, Arrays.asList(new JmxReporter()), time, true);
     }
 
     @After
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e10d4b2..d52a8a6 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -33,14 +33,20 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{MockTime, Sanitizer}
 import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
+import org.junit.{After, Test}
 
 class ClientQuotaManagerTest {
   private val time = new MockTime
-
+  private val metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
   private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
 
   var numCallbacks: Int = 0
+
+  @After
+  def tearDown(): Unit = {
+    metrics.close()
+  }
+
   def callback (response: RequestChannel.Response) {
     // Count how many times this callback is called for notifyThrottlingDone().
     response match {
@@ -49,11 +55,6 @@ class ClientQuotaManagerTest {
     }
   }
 
-  @Before
-  def beforeMethod() {
-    numCallbacks = 0
-  }
-
   private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
                                                  listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
 
@@ -81,7 +82,7 @@ class ClientQuotaManagerTest {
   }
 
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
-    val clientMetrics = new ClientQuotaManager(config, newMetrics, Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
@@ -193,7 +194,7 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaConfigPrecedence() {
     val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
-        newMetrics, Produce, time, "")
+      metrics, Produce, time, "")
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
       assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
@@ -265,7 +266,6 @@ class ClientQuotaManagerTest {
 
   @Test
   def testQuotaViolation() {
-    val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
     try {
@@ -313,7 +313,6 @@ class ClientQuotaManagerTest {
 
   @Test
   def testRequestPercentageQuotaViolation() {
-    val metrics = newMetrics
     val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "", None)
     quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
@@ -376,7 +375,6 @@ class ClientQuotaManagerTest {
 
   @Test
   def testExpireThrottleTimeSensor() {
-    val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
       maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
@@ -396,7 +394,6 @@ class ClientQuotaManagerTest {
 
   @Test
   def testExpireQuotaSensors() {
-    val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
       maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
@@ -420,7 +417,6 @@ class ClientQuotaManagerTest {
 
   @Test
   def testClientIdNotSanitized() {
-    val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val clientId = "client@#$%"
     try {
@@ -437,10 +433,6 @@ class ClientQuotaManagerTest {
     }
   }
 
-  def newMetrics: Metrics = {
-    new Metrics(new MetricConfig(), Collections.emptyList(), time)
-  }
-
   private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None) {
     // The class under test expects only sanitized client configs. We pass both the default value (which should not be
     // sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 284d9ab..da9c534 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -23,16 +23,22 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import org.junit.Test
+import org.junit.{After, Test}
 
 import scala.collection.JavaConverters._
 
 class ReplicationQuotaManagerTest {
   private val time = new MockTime
+  private val metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
+
+  @After
+  def tearDown: Unit = {
+    metrics.close()
+  }
 
   @Test
   def shouldThrottleOnlyDefinedReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, QuotaType.Fetch, time)
+    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), metrics, QuotaType.Fetch, time)
     quota.markThrottled("topic1", Seq(1, 2, 3))
 
     assertTrue(quota.isThrottled(tp1(1)))
@@ -43,7 +49,6 @@ class ReplicationQuotaManagerTest {
 
   @Test
   def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
-    val metrics = newMetrics()
     val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(numQuotaSamples = 10, quotaWindowSizeSeconds = 1), metrics, LeaderReplication, time)
 
     //Given
@@ -105,7 +110,7 @@ class ReplicationQuotaManagerTest {
 
   @Test
   def shouldSupportWildcardThrottledReplicas(): Unit = {
-    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, LeaderReplication, time)
+    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), metrics, LeaderReplication, time)
 
     //When
     quota.markThrottled("MyTopic")
@@ -116,8 +121,4 @@ class ReplicationQuotaManagerTest {
   }
 
   private def tp1(id: Int): TopicPartition = new TopicPartition("topic1", id)
-
-  private def newMetrics(): Metrics = {
-    new Metrics(new MetricConfig(), Collections.emptyList(), time)
-  }
 }