You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/10/20 20:09:41 UTC

[storm] branch master updated: STORM-3801 add option for gauges to report values per reporter if supported (#3417)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 2092e61  STORM-3801 add option for gauges to report values per reporter if supported (#3417)
2092e61 is described below

commit 2092e61a80d236b9ebb58b1ca36a0bb297a2f3db
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Oct 20 15:09:27 2021 -0500

    STORM-3801 add option for gauges to report values per reporter if supported (#3417)
---
 .../jvm/org/apache/storm/executor/Executor.java    |  9 +++-
 .../jvm/org/apache/storm/metric/SystemBolt.java    | 55 ++++++++++++++--------
 .../apache/storm/metrics2/PerReporterGauge.java    | 20 ++++++++
 3 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index ffe6fc5..7ebddfe 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -65,6 +65,7 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.PerReporterGauge;
 import org.apache.storm.metrics2.RateCounter;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
@@ -378,7 +379,13 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     private void processGauges(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
         Map<String, Gauge> gauges = workerData.getMetricRegistry().getTaskGauges(taskId);
         for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
-            Object v = entry.getValue().getValue();
+            Gauge gauge = entry.getValue();
+            Object v;
+            if (gauge instanceof PerReporterGauge) {
+                v = ((PerReporterGauge) gauge).getValueForReporter(this);
+            } else {
+                v = gauge.getValue();
+            }
             if (v instanceof Number) {
                 IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), v);
                 dataPoints.add(dataPoint);
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index e9483e7..29f1219 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -18,9 +18,11 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metrics2.PerReporterGauge;
 import org.apache.storm.metrics2.WorkerMetricRegistrant;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
@@ -63,33 +65,46 @@ public class SystemBolt implements IBolt {
             }
         });
 
-        // newWorkerEvent: 1 when a worker is first started and 0 all other times.
-        // This can be used to tell when a worker has crashed and is restarted.
-        final IMetric newWorkerEvent = new IMetric() {
-            boolean doEvent = true;
 
-            @Override
-            public Object getValueAndReset() {
-                if (doEvent) {
-                    doEvent = false;
-                    return 1;
-                } else {
-                    return 0;
-                }
-            }
-        };
-        context.registerGauge("newWorkerEvent", new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                return (Integer) newWorkerEvent.getValueAndReset();
-            }
-        });
+        context.registerGauge("newWorkerEvent", new NewWorkerGauge());
 
         int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
     }
 
+    // newWorkerEvent: 1 when a worker is first started and 0 all other times.
+    // This can be used to tell when a worker has crashed and is restarted.
+    private class NewWorkerImetric implements IMetric {
+        boolean doEvent = true;
+
+        @Override
+        public Object getValueAndReset() {
+            if (doEvent) {
+                doEvent = false;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    // allow reporting new worker metric for multiple reporters if they support getValueForReporter().
+    private class NewWorkerGauge extends PerReporterGauge<Integer> {
+        private final NewWorkerImetric defaultValue = new NewWorkerImetric();
+        private final Map<Object, NewWorkerImetric> reporterValues = new HashMap<>();
+
+        @Override
+        public Integer getValue() {
+            return (Integer) defaultValue.getValueAndReset();
+        }
+
+        @Override
+        public Integer getValueForReporter(Object reporter) {
+            return (Integer) reporterValues.computeIfAbsent(reporter, (rep) -> new NewWorkerImetric()).getValueAndReset();
+        }
+    }
+
     private void registerMetrics(TopologyContext context, Map<String, String> metrics, int bucketSize, Map<String, Object> conf) {
         if (metrics == null) {
             return;
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java
new file mode 100644
index 0000000..c5507cd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public abstract class PerReporterGauge<T> implements Gauge<T> {
+
+    public abstract T getValueForReporter(Object reporter);
+}