You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/10/20 20:09:41 UTC
[storm] branch master updated: STORM-3801 add option for gauges to
report values per reporter if supported (#3417)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 2092e61 STORM-3801 add option for gauges to report values per reporter if supported (#3417)
2092e61 is described below
commit 2092e61a80d236b9ebb58b1ca36a0bb297a2f3db
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Oct 20 15:09:27 2021 -0500
STORM-3801 add option for gauges to report values per reporter if supported (#3417)
---
.../jvm/org/apache/storm/executor/Executor.java | 9 +++-
.../jvm/org/apache/storm/metric/SystemBolt.java | 55 ++++++++++++++--------
.../apache/storm/metrics2/PerReporterGauge.java | 20 ++++++++
3 files changed, 63 insertions(+), 21 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index ffe6fc5..7ebddfe 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -65,6 +65,7 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.PerReporterGauge;
import org.apache.storm.metrics2.RateCounter;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Lists;
@@ -378,7 +379,13 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
private void processGauges(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
Map<String, Gauge> gauges = workerData.getMetricRegistry().getTaskGauges(taskId);
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
- Object v = entry.getValue().getValue();
+ Gauge gauge = entry.getValue();
+ Object v;
+ if (gauge instanceof PerReporterGauge) {
+ v = ((PerReporterGauge) gauge).getValueForReporter(this);
+ } else {
+ v = gauge.getValue();
+ }
if (v instanceof Number) {
IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), v);
dataPoints.add(dataPoint);
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index e9483e7..29f1219 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -18,9 +18,11 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metrics2.PerReporterGauge;
import org.apache.storm.metrics2.WorkerMetricRegistrant;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
@@ -63,33 +65,46 @@ public class SystemBolt implements IBolt {
}
});
- // newWorkerEvent: 1 when a worker is first started and 0 all other times.
- // This can be used to tell when a worker has crashed and is restarted.
- final IMetric newWorkerEvent = new IMetric() {
- boolean doEvent = true;
- @Override
- public Object getValueAndReset() {
- if (doEvent) {
- doEvent = false;
- return 1;
- } else {
- return 0;
- }
- }
- };
- context.registerGauge("newWorkerEvent", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return (Integer) newWorkerEvent.getValueAndReset();
- }
- });
+ context.registerGauge("newWorkerEvent", new NewWorkerGauge());
int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
}
+ // newWorkerEvent: 1 when a worker is first started and 0 all other times.
+ // This can be used to tell when a worker has crashed and is restarted.
+ private class NewWorkerImetric implements IMetric {
+ boolean doEvent = true;
+
+ @Override
+ public Object getValueAndReset() {
+ if (doEvent) {
+ doEvent = false;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ // allow reporting new worker metric for multiple reporters if they support getValueForReporter().
+ private class NewWorkerGauge extends PerReporterGauge<Integer> {
+ private final NewWorkerImetric defaultValue = new NewWorkerImetric();
+ private final Map<Object, NewWorkerImetric> reporterValues = new HashMap<>();
+
+ @Override
+ public Integer getValue() {
+ return (Integer) defaultValue.getValueAndReset();
+ }
+
+ @Override
+ public Integer getValueForReporter(Object reporter) {
+ return (Integer) reporterValues.computeIfAbsent(reporter, (rep) -> new NewWorkerImetric()).getValueAndReset();
+ }
+ }
+
private void registerMetrics(TopologyContext context, Map<String, String> metrics, int bucketSize, Map<String, Object> conf) {
if (metrics == null) {
return;
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java
new file mode 100644
index 0000000..c5507cd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/PerReporterGauge.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public abstract class PerReporterGauge<T> implements Gauge<T> {
+
+ public abstract T getValueForReporter(Object reporter);
+}