You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/28 16:18:40 UTC
[flink] branch master updated: [FLINK-26849][metrics] Deduplicate metric (un)registration logic
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 865299d [FLINK-26849][metrics] Deduplicate metric (un)registration logic
865299d is described below
commit 865299d7cf49ed0bf1e2897afe251cafd0c547a9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Mar 23 11:33:36 2022 +0100
[FLINK-26849][metrics] Deduplicate metric (un)registration logic
---
.../flink/runtime/metrics/MetricRegistryImpl.java | 57 ++++++++++------------
1 file changed, 27 insertions(+), 30 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index f1150d7..e8108b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -41,11 +41,13 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.QuadConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.time.Duration;
import java.util.ArrayList;
@@ -379,21 +381,7 @@ public class MetricRegistryImpl implements MetricRegistry {
"Cannot register metric, because the MetricRegistry has already been shut down.");
} else {
if (reporters != null) {
- for (int i = 0; i < reporters.size(); i++) {
- ReporterAndSettings reporterAndSettings = reporters.get(i);
- try {
- if (reporterAndSettings != null) {
- FrontMetricGroup front =
- new FrontMetricGroup<AbstractMetricGroup<?>>(
- reporterAndSettings.getSettings(), group);
- reporterAndSettings
- .getReporter()
- .notifyOfAddedMetric(metric, metricName, front);
- }
- } catch (Exception e) {
- LOG.warn("Error while registering metric: {}.", metricName, e);
- }
- }
+ forAllReporters(MetricReporter::notifyOfAddedMetric, metric, metricName, group);
}
try {
if (queryService != null) {
@@ -424,21 +412,8 @@ public class MetricRegistryImpl implements MetricRegistry {
"Cannot unregister metric, because the MetricRegistry has already been shut down.");
} else {
if (reporters != null) {
- for (int i = 0; i < reporters.size(); i++) {
- try {
- ReporterAndSettings reporterAndSettings = reporters.get(i);
- if (reporterAndSettings != null) {
- FrontMetricGroup front =
- new FrontMetricGroup<AbstractMetricGroup<?>>(
- reporterAndSettings.getSettings(), group);
- reporterAndSettings
- .getReporter()
- .notifyOfRemovedMetric(metric, metricName, front);
- }
- } catch (Exception e) {
- LOG.warn("Error while unregistering metric: {}.", metricName, e);
- }
- }
+ forAllReporters(
+ MetricReporter::notifyOfRemovedMetric, metric, metricName, group);
}
try {
if (queryService != null) {
@@ -460,6 +435,28 @@ public class MetricRegistryImpl implements MetricRegistry {
}
}
+ @GuardedBy("lock")
+ private void forAllReporters(
+ QuadConsumer<MetricReporter, Metric, String, MetricGroup> operation,
+ Metric metric,
+ String metricName,
+ AbstractMetricGroup group) {
+ for (int i = 0; i < reporters.size(); i++) {
+ try {
+ ReporterAndSettings reporterAndSettings = reporters.get(i);
+ if (reporterAndSettings != null) {
+ FrontMetricGroup front =
+ new FrontMetricGroup<AbstractMetricGroup<?>>(
+ reporterAndSettings.getSettings(), group);
+
+ operation.accept(reporterAndSettings.getReporter(), metric, metricName, front);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while handling metric: {}.", metricName, e);
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
@VisibleForTesting