You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/06/13 17:36:56 UTC
[kafka] branch trunk updated: KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
5cab11cf52 is described below
commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Mon Jun 13 23:06:39 2022 +0530
KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
Reviewers: David Jacot <dj...@confluent.io>, Justine Olshan <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/common/metrics/Metrics.java | 40 +++++++++++++++++--
.../org/apache/kafka/common/metrics/Sensor.java | 10 ++++-
.../kafka/connect/runtime/ConnectMetrics.java | 8 +---
.../internals/metrics/StreamsMetricsImplTest.java | 46 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 52b7794a4c..398819016c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -509,7 +509,10 @@ public class Metrics implements Closeable {
Objects.requireNonNull(metricValueProvider),
config == null ? this.config : config,
time);
- registerMetric(m);
+ KafkaMetric existingMetric = registerMetric(m);
+ if (existingMetric != null) {
+ throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
+ }
}
/**
@@ -524,6 +527,26 @@ public class Metrics implements Closeable {
addMetric(metricName, null, metricValueProvider);
}
+ /**
+ * Create or get an existing metric to monitor an object that implements MetricValueProvider.
+ * This metric won't be associated with any sensor. This is a way to expose existing values as metrics.
+ * This method takes care of synchronisation while updating/accessing metrics by concurrent threads.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with this metric
+ * @return Existing KafkaMetric if already registered or else a newly created one
+ */
+ public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
+ KafkaMetric metric = new KafkaMetric(new Object(),
+ Objects.requireNonNull(metricName),
+ Objects.requireNonNull(metricValueProvider),
+ config == null ? this.config : config,
+ time);
+
+ KafkaMetric existingMetric = registerMetric(metric);
+ return existingMetric == null ? metric : existingMetric;
+ }
+
/**
* Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
* will be invoked for each reporter.
@@ -563,10 +586,18 @@ public class Metrics implements Closeable {
}
}
- synchronized void registerMetric(KafkaMetric metric) {
+ /**
+ * Register a metric if not present or return an already existing metric otherwise.
+ * When a metric is newly registered, this method returns null
+ *
+ * @param metric The KafkaMetric to register
+ * @return KafkaMetric if the metric already exists, null otherwise
+ */
+ synchronized KafkaMetric registerMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName();
- if (this.metrics.containsKey(metricName))
- throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
+ if (this.metrics.containsKey(metricName)) {
+ return this.metrics.get(metricName);
+ }
this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters) {
try {
@@ -576,6 +607,7 @@ public class Metrics implements Closeable {
}
}
log.trace("Registered metric named {}", metricName);
+ return null;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ae3b8d997..25f3c21a31 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -297,7 +297,10 @@ public final class Sensor {
for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time);
if (!metrics.containsKey(metric.metricName())) {
- registry.registerMetric(metric);
+ KafkaMetric existingMetric = registry.registerMetric(metric);
+ if (existingMetric != null) {
+ throw new IllegalArgumentException("A metric named '" + metric.metricName() + "' already exists, can't register another one.");
+ }
metrics.put(metric.metricName(), metric);
}
}
@@ -336,7 +339,10 @@ public final class Sensor {
statConfig,
time
);
- registry.registerMetric(metric);
+ KafkaMetric existingMetric = registry.registerMetric(metric);
+ if (existingMetric != null) {
+ throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
+ }
metrics.put(metric.metricName(), metric);
stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config));
return true;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 7dad6aec0a..ed81be657a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -319,9 +319,7 @@ public class ConnectMetrics {
*/
public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) {
MetricName metricName = metricName(nameTemplate);
- if (metrics().metric(metricName) == null) {
- metrics().addMetric(metricName, (Gauge<T>) (config, now) -> supplier.metricValue(now));
- }
+ metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> supplier.metricValue(now));
}
/**
@@ -333,9 +331,7 @@ public class ConnectMetrics {
*/
public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) {
MetricName metricName = metricName(nameTemplate);
- if (metrics().metric(metricName) == null) {
- metrics().addMetric(metricName, (Gauge<T>) (config, now) -> value);
- }
+ metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> value);
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index b8d3d92e62..5ec834a11f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -84,6 +85,8 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.powermock.api.easymock.PowerMock.createMock;
@RunWith(PowerMockRunner.class)
@@ -497,6 +500,17 @@ public class StreamsMetricsImplTest {
verify(metrics);
}
+ @Test
+ public void shouldCreateNewStoreLevelMutableMetric() {
+ final MetricName metricName =
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+ final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+ final Metrics metrics = new Metrics(metricConfig);
+ assertNull(metrics.metric(metricName));
+ metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
+ assertNotNull(metrics.metric(metricName));
+ }
+
@Test
public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
final Metrics metrics = mock(Metrics.class);
@@ -521,6 +535,38 @@ public class StreamsMetricsImplTest {
verify(metrics);
}
+ @Test
+ public void shouldReturnSameMetricIfAlreadyCreated() {
+ final MetricName metricName =
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+ final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+ final Metrics metrics = new Metrics(metricConfig);
+ assertNull(metrics.metric(metricName));
+ final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
+ assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER));
+ }
+
+ @Test
+ public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws InterruptedException {
+ final MetricName metricName =
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+ final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
+ final Metrics metrics = new Metrics(metricConfig);
+ assertNull(metrics.metric(metricName));
+ final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>();
+ final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>();
+
+ final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
+ final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
+
+ thread1.start();
+ thread2.start();
+
+ thread1.join();
+ thread2.join();
+ assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get());
+ }
+
@Test
public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = niceMock(Metrics.class);