You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/11 22:32:06 UTC
[kafka] branch trunk updated: KAFKA-8782: Close metrics in
QuotaManagerTests (#7191)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 558bb1d KAFKA-8782: Close metrics in QuotaManagerTests (#7191)
558bb1d is described below
commit 558bb1d0692b3ba6cf9c46d7170be6d31656024b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Aug 11 15:31:42 2019 -0700
KAFKA-8782: Close metrics in QuotaManagerTests (#7191)
Since `Metrics` was constructed with `enableExpiration=false`, this was
not a source of flakiness given the current implementation. This could
change in the future, so good to follow the class contract.
Included a few clean-ups with regards to redundant casts and type parameters
as well as usage of try with resources for inline usage of `Metrics`.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../org/apache/kafka/common/metrics/Metrics.java | 6 ++--
.../apache/kafka/common/metrics/MetricsTest.java | 7 ++---
.../apache/kafka/common/metrics/SensorTest.java | 36 ++++++++++------------
.../common/metrics/stats/FrequenciesTest.java | 3 +-
.../unit/kafka/server/ClientQuotaManagerTest.scala | 28 ++++++-----------
.../kafka/server/ReplicationQuotaManagerTest.scala | 17 +++++-----
6 files changed, 43 insertions(+), 54 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 91c245d..e512e8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -90,7 +90,7 @@ public class Metrics implements Closeable {
* Expiration of Sensors is disabled.
*/
public Metrics(Time time) {
- this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
+ this(new MetricConfig(), new ArrayList<>(0), time);
}
/**
@@ -98,7 +98,7 @@ public class Metrics implements Closeable {
* Expiration of Sensors is disabled.
*/
public Metrics(MetricConfig defaultConfig, Time time) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), time);
+ this(defaultConfig, new ArrayList<>(0), time);
}
@@ -108,7 +108,7 @@ public class Metrics implements Closeable {
* @param defaultConfig The default config to use for all metrics that don't override their config
*/
public Metrics(MetricConfig defaultConfig) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), Time.SYSTEM);
+ this(defaultConfig, new ArrayList<>(0), Time.SYSTEM);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 0b1ea85..0357045 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -63,7 +63,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings("deprecation")
public class MetricsTest {
private static final Logger log = LoggerFactory.getLogger(MetricsTest.class);
@@ -75,7 +74,7 @@ public class MetricsTest {
@Before
public void setup() {
- this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+ this.metrics = new Metrics(config, Arrays.asList(new JmxReporter()), time, true);
}
@After
@@ -631,7 +630,7 @@ public class MetricsTest {
Map<String, String> childTagsWithValues = new HashMap<>();
childTagsWithValues.put("child-tag", "child-tag-value");
- try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList((MetricsReporter) new JmxReporter()), time, true)) {
+ try (Metrics inherited = new Metrics(new MetricConfig().tags(parentTagsWithValues), Arrays.asList(new JmxReporter()), time, true)) {
MetricName inheritedMetric = inherited.metricInstance(SampleMetrics.METRIC_WITH_INHERITED_TAGS, childTagsWithValues);
Map<String, String> filledOutTags = inheritedMetric.tags();
@@ -731,7 +730,7 @@ public class MetricsTest {
final LockingReporter reporter = new LockingReporter();
this.metrics.close();
- this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true);
+ this.metrics = new Metrics(config, Arrays.asList(reporter), new MockTime(10), true);
final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
SensorCreator sensorCreator = new SensorCreator(metrics);
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index fc7cfe2..d1f3a91 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -86,25 +86,23 @@ public class SensorTest {
public void testExpiredSensor() {
MetricConfig config = new MetricConfig();
Time mockTime = new MockTime();
- Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), mockTime, true);
-
- long inactiveSensorExpirationTimeSeconds = 60L;
- Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
- inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
-
- assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
-
- Map<String, String> emptyTags = Collections.emptyMap();
- MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
- MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
- Meter meter = new Meter(rateMetricName, totalMetricName);
- assertTrue(sensor.add(meter));
-
- mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
- assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
- assertFalse(sensor.add(meter));
-
- metrics.close();
+ try (Metrics metrics = new Metrics(config, Arrays.asList(new JmxReporter()), mockTime, true)) {
+ long inactiveSensorExpirationTimeSeconds = 60L;
+ Sensor sensor = new Sensor(metrics, "sensor", null, config, mockTime,
+ inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO);
+
+ assertTrue(sensor.add(metrics.metricName("test1", "grp1"), new Avg()));
+
+ Map<String, String> emptyTags = Collections.emptyMap();
+ MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
+ MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
+ Meter meter = new Meter(rateMetricName, totalMetricName);
+ assertTrue(sensor.add(meter));
+
+ mockTime.sleep(TimeUnit.SECONDS.toMillis(inactiveSensorExpirationTimeSeconds + 1));
+ assertFalse(sensor.add(metrics.metricName("test3", "grp1"), new Avg()));
+ assertFalse(sensor.add(meter));
+ }
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index e3e623f..bf7647a 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -46,7 +45,7 @@ public class FrequenciesTest {
public void setup() {
config = new MetricConfig().eventWindow(50).samples(2);
time = new MockTime();
- metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+ metrics = new Metrics(config, Arrays.asList(new JmxReporter()), time, true);
}
@After
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e10d4b2..d52a8a6 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -33,14 +33,20 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{MockTime, Sanitizer}
import org.easymock.EasyMock
import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
+import org.junit.{After, Test}
class ClientQuotaManagerTest {
private val time = new MockTime
-
+ private val metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
var numCallbacks: Int = 0
+
+ @After
+ def tearDown(): Unit = {
+ metrics.close()
+ }
+
def callback (response: RequestChannel.Response) {
// Count how many times this callback is called for notifyThrottlingDone().
response match {
@@ -49,11 +55,6 @@ class ClientQuotaManagerTest {
}
}
- @Before
- def beforeMethod() {
- numCallbacks = 0
- }
-
private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
@@ -81,7 +82,7 @@ class ClientQuotaManagerTest {
}
private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
- val clientMetrics = new ClientQuotaManager(config, newMetrics, Produce, time, "")
+ val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
try {
// Case 1: Update the quota. Assert that the new quota value is returned
@@ -193,7 +194,7 @@ class ClientQuotaManagerTest {
@Test
def testQuotaConfigPrecedence() {
val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
- newMetrics, Produce, time, "")
+ metrics, Produce, time, "")
def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
@@ -265,7 +266,6 @@ class ClientQuotaManagerTest {
@Test
def testQuotaViolation() {
- val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try {
@@ -313,7 +313,6 @@ class ClientQuotaManagerTest {
@Test
def testRequestPercentageQuotaViolation() {
- val metrics = newMetrics
val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "", None)
quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
@@ -376,7 +375,6 @@ class ClientQuotaManagerTest {
@Test
def testExpireThrottleTimeSensor() {
- val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
try {
maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
@@ -396,7 +394,6 @@ class ClientQuotaManagerTest {
@Test
def testExpireQuotaSensors() {
- val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
try {
maybeRecord(clientMetrics, "ANONYMOUS", "client1", 100)
@@ -420,7 +417,6 @@ class ClientQuotaManagerTest {
@Test
def testClientIdNotSanitized() {
- val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
val clientId = "client@#$%"
try {
@@ -437,10 +433,6 @@ class ClientQuotaManagerTest {
}
}
- def newMetrics: Metrics = {
- new Metrics(new MetricConfig(), Collections.emptyList(), time)
- }
-
private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None) {
// The class under test expects only sanitized client configs. We pass both the default value (which should not be
// sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 284d9ab..da9c534 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -23,16 +23,22 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
import org.apache.kafka.common.utils.MockTime
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import org.junit.Test
+import org.junit.{After, Test}
import scala.collection.JavaConverters._
class ReplicationQuotaManagerTest {
private val time = new MockTime
+ private val metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
+
+ @After
+ def tearDown: Unit = {
+ metrics.close()
+ }
@Test
def shouldThrottleOnlyDefinedReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, QuotaType.Fetch, time)
+ val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), metrics, QuotaType.Fetch, time)
quota.markThrottled("topic1", Seq(1, 2, 3))
assertTrue(quota.isThrottled(tp1(1)))
@@ -43,7 +49,6 @@ class ReplicationQuotaManagerTest {
@Test
def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
- val metrics = newMetrics()
val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(numQuotaSamples = 10, quotaWindowSizeSeconds = 1), metrics, LeaderReplication, time)
//Given
@@ -105,7 +110,7 @@ class ReplicationQuotaManagerTest {
@Test
def shouldSupportWildcardThrottledReplicas(): Unit = {
- val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, LeaderReplication, time)
+ val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), metrics, LeaderReplication, time)
//When
quota.markThrottled("MyTopic")
@@ -116,8 +121,4 @@ class ReplicationQuotaManagerTest {
}
private def tp1(id: Int): TopicPartition = new TopicPartition("topic1", id)
-
- private def newMetrics(): Metrics = {
- new Metrics(new MetricConfig(), Collections.emptyList(), time)
- }
}