You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/28 16:18:40 UTC

[flink] branch master updated: [FLINK-26849][metrics] Deduplicate metric (un)registration logic

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 865299d  [FLINK-26849][metrics] Deduplicate metric (un)registration logic
865299d is described below

commit 865299d7cf49ed0bf1e2897afe251cafd0c547a9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Mar 23 11:33:36 2022 +0100

    [FLINK-26849][metrics] Deduplicate metric (un)registration logic
---
 .../flink/runtime/metrics/MetricRegistryImpl.java  | 57 ++++++++++------------
 1 file changed, 27 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index f1150d7..e8108b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -41,11 +41,13 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.QuadConsumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -379,21 +381,7 @@ public class MetricRegistryImpl implements MetricRegistry {
                         "Cannot register metric, because the MetricRegistry has already been shut down.");
             } else {
                 if (reporters != null) {
-                    for (int i = 0; i < reporters.size(); i++) {
-                        ReporterAndSettings reporterAndSettings = reporters.get(i);
-                        try {
-                            if (reporterAndSettings != null) {
-                                FrontMetricGroup front =
-                                        new FrontMetricGroup<AbstractMetricGroup<?>>(
-                                                reporterAndSettings.getSettings(), group);
-                                reporterAndSettings
-                                        .getReporter()
-                                        .notifyOfAddedMetric(metric, metricName, front);
-                            }
-                        } catch (Exception e) {
-                            LOG.warn("Error while registering metric: {}.", metricName, e);
-                        }
-                    }
+                    forAllReporters(MetricReporter::notifyOfAddedMetric, metric, metricName, group);
                 }
                 try {
                     if (queryService != null) {
@@ -424,21 +412,8 @@ public class MetricRegistryImpl implements MetricRegistry {
                         "Cannot unregister metric, because the MetricRegistry has already been shut down.");
             } else {
                 if (reporters != null) {
-                    for (int i = 0; i < reporters.size(); i++) {
-                        try {
-                            ReporterAndSettings reporterAndSettings = reporters.get(i);
-                            if (reporterAndSettings != null) {
-                                FrontMetricGroup front =
-                                        new FrontMetricGroup<AbstractMetricGroup<?>>(
-                                                reporterAndSettings.getSettings(), group);
-                                reporterAndSettings
-                                        .getReporter()
-                                        .notifyOfRemovedMetric(metric, metricName, front);
-                            }
-                        } catch (Exception e) {
-                            LOG.warn("Error while unregistering metric: {}.", metricName, e);
-                        }
-                    }
+                    forAllReporters(
+                            MetricReporter::notifyOfRemovedMetric, metric, metricName, group);
                 }
                 try {
                     if (queryService != null) {
@@ -460,6 +435,28 @@ public class MetricRegistryImpl implements MetricRegistry {
         }
     }
 
+    @GuardedBy("lock")
+    private void forAllReporters(
+            QuadConsumer<MetricReporter, Metric, String, MetricGroup> operation,
+            Metric metric,
+            String metricName,
+            AbstractMetricGroup group) {
+        for (int i = 0; i < reporters.size(); i++) {
+            try {
+                ReporterAndSettings reporterAndSettings = reporters.get(i);
+                if (reporterAndSettings != null) {
+                    FrontMetricGroup front =
+                            new FrontMetricGroup<AbstractMetricGroup<?>>(
+                                    reporterAndSettings.getSettings(), group);
+
+                    operation.accept(reporterAndSettings.getReporter(), metric, metricName, front);
+                }
+            } catch (Exception e) {
+                LOG.warn("Error while handling metric: {}.", metricName, e);
+            }
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     @VisibleForTesting