You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/04/28 18:47:14 UTC
[storm] branch master updated: STORM-3623 executors should only
report their metrics for V2 metric tick
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 6976624 STORM-3623 executors should only report their metrics for V2 metric tick
new 38d714b Merge pull request #3251 from agresch/agresch_storm_3623
6976624 is described below
commit 6976624228386c25e9ac6dd2b8ef87a9deeb1ad9
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Fri Apr 17 09:53:05 2020 -0500
STORM-3623 executors should only report their metrics for V2 metric tick
---
.../jvm/org/apache/storm/executor/Executor.java | 74 +++++++-----
.../apache/storm/metrics2/StormMetricRegistry.java | 131 +++++++++++++++++++--
.../jvm/org/apache/storm/task/IMetricsContext.java | 2 +-
.../jvm/org/apache/storm/task/TopologyContext.java | 18 ++-
.../trident/operation/TridentOperationContext.java | 4 +-
5 files changed, 174 insertions(+), 55 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index bc7ce20..6d5b9f2 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -63,7 +63,6 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -317,7 +316,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
}
}
- addV2Metrics(dataPoints);
+ addV2Metrics(taskId, dataPoints);
if (!dataPoints.isEmpty()) {
IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
@@ -333,51 +332,66 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
// updates v1 metric dataPoints with v2 metric API data
- private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
+ private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
if (!enableV2MetricsDataPoints) {
return;
}
- StormMetricRegistry stormMetricRegistry = workerData.getMetricRegistry();
- for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
- String name = entry.getKey();
+ processGauges(taskId, dataPoints);
+ processCounters(taskId, dataPoints);
+ processHistograms(taskId, dataPoints);
+ processMeters(taskId, dataPoints);
+ processTimers(taskId, dataPoints);
+ }
+
+ private void processGauges(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+ Map<String, Gauge> gauges = workerData.getMetricRegistry().getTaskGauges(taskId);
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
Object v = entry.getValue().getValue();
if (v instanceof Number) {
- IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), v);
dataPoints.add(dataPoint);
- } else {
- LOG.warn("Cannot report {}, its value is not a Number {}", name, v);
}
}
- for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
+ }
+
+ private void processCounters(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+ Map<String, Counter> counters = workerData.getMetricRegistry().getTaskCounters(taskId);
+ for (Map.Entry<String, Counter> entry : counters.entrySet()) {
Object value = entry.getValue().getCount();
IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
dataPoints.add(dataPoint);
}
- for (Map.Entry<String, Histogram> entry: stormMetricRegistry.registry().getHistograms().entrySet()) {
- String baseName = entry.getKey();
- Histogram histogram = entry.getValue();
- Snapshot snapshot = histogram.getSnapshot();
- addSnapshotDatapoints(baseName, snapshot, dataPoints);
- IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", histogram.getCount());
+ }
+
+ private void processHistograms(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+ Map<String, Histogram> histograms = workerData.getMetricRegistry().getTaskHistograms(taskId);
+ for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+ Snapshot snapshot = entry.getValue().getSnapshot();
+ addSnapshotDatapoints(entry.getKey(), snapshot, dataPoints);
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
dataPoints.add(dataPoint);
}
- for (Map.Entry<String, Meter> entry: stormMetricRegistry.registry().getMeters().entrySet()) {
- String baseName = entry.getKey();
- Meter meter = entry.getValue();
- IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", meter.getCount());
+ }
+
+ private void processMeters(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+ Map<String, Meter> meters = workerData.getMetricRegistry().getTaskMeters(taskId);
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
dataPoints.add(dataPoint);
- addConvertedMetric(baseName, ".m1_rate", meter.getOneMinuteRate(), dataPoints);
- addConvertedMetric(baseName, ".m5_rate", meter.getFiveMinuteRate(), dataPoints);
- addConvertedMetric(baseName, ".m15_rate", meter.getFifteenMinuteRate(), dataPoints);
- addConvertedMetric(baseName, ".mean_rate", meter.getMeanRate(), dataPoints);
+ addConvertedMetric(entry.getKey(), ".m1_rate", entry.getValue().getOneMinuteRate(), dataPoints);
+ addConvertedMetric(entry.getKey(), ".m5_rate", entry.getValue().getFiveMinuteRate(), dataPoints);
+ addConvertedMetric(entry.getKey(), ".m15_rate", entry.getValue().getFifteenMinuteRate(), dataPoints);
+ addConvertedMetric(entry.getKey(), ".mean_rate", entry.getValue().getMeanRate(), dataPoints);
}
- for (Map.Entry<String, Timer> entry : stormMetricRegistry.registry().getTimers().entrySet()) {
- String baseName = entry.getKey();
- Timer timer = entry.getValue();
- Snapshot snapshot = timer.getSnapshot();
- addSnapshotDatapoints(baseName, snapshot, dataPoints);
- IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", timer.getCount());
+ }
+
+ private void processTimers(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+ Map<String, Timer> timers = workerData.getMetricRegistry().getTaskTimers(taskId);
+ for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+ Snapshot snapshot = entry.getValue().getSnapshot();
+ addSnapshotDatapoints(entry.getKey(), snapshot, dataPoints);
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
dataPoints.add(dataPoint);
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index df064a0..6501a4f 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -13,12 +13,21 @@
package org.apache.storm.metrics2;
import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.storm.Config;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.metrics2.reporters.StormReporter;
@@ -35,38 +44,138 @@ public class StormMetricRegistry {
private final MetricRegistry registry = new MetricRegistry();
private final List<StormReporter> reporters = new ArrayList<>();
+ private final ConcurrentMap<Integer, Map<String, Gauge>> taskIdGauges = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Map<String, Meter>> taskIdMeters = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Map<String, Counter>> taskIdCounters = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Map<String, Timer>> taskIdTimers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Map<String, Histogram>> taskIdHistograms = new ConcurrentHashMap<>();
private String hostName = null;
public <T> SimpleGauge<T> gauge(
T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
String metricName = metricName(name, topologyId, componentId, taskId, port);
- return (SimpleGauge<T>) registry.gauge(metricName, () -> new SimpleGauge<>(initialValue));
+ Gauge gauge = registry.gauge(metricName, () -> new SimpleGauge<>(initialValue));
+ saveMetricTaskIdMapping(taskId, metricName, gauge, taskIdGauges);
+ return (SimpleGauge<T>) gauge;
+ }
+
+ public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext context) {
+ String metricName = metricName(name, context);
+ gauge = registry.register(metricName, gauge);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, gauge, taskIdGauges);
+ return gauge;
}
public JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) {
- return new JcMetrics(
- gauge(0L, name + "-capacity", topologyId, componentId, taskId, port),
- gauge(0L, name + "-population", topologyId, componentId, taskId, port)
- );
+ SimpleGauge<Long> capacityGauge = gauge(0L, name + "-capacity", topologyId, componentId, taskId, port);
+ SimpleGauge<Long> populationGauge = gauge(0L, name + "-population", topologyId, componentId, taskId, port);
+ return new JcMetrics(capacityGauge, populationGauge);
}
public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
String metricName = metricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
- return registry.meter(metricName);
+ Meter meter = registry.meter(metricName);
+ saveMetricTaskIdMapping(taskId, metricName, meter, taskIdMeters);
+ return meter;
+ }
+
+ public Meter meter(String name, TopologyContext context) {
+ String metricName = metricName(name, context);
+ Meter meter = registry.meter(metricName);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, meter, taskIdMeters);
+ return meter;
}
public Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
String metricName = metricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
- return registry.counter(metricName);
+ Counter counter = registry.counter(metricName);
+ saveMetricTaskIdMapping(taskId, metricName, counter, taskIdCounters);
+ return counter;
}
public Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) {
String metricName = metricName(name, topologyId, componentId, streamId, taskId, workerPort);
- return registry.counter(metricName);
+ Counter counter = registry.counter(metricName);
+ saveMetricTaskIdMapping(taskId, metricName, counter, taskIdCounters);
+ return counter;
}
-
- public MetricRegistry registry() {
- return registry;
+
+ public Counter counter(String name, TopologyContext context) {
+ String metricName = metricName(name, context);
+ Counter counter = registry.counter(metricName);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, counter, taskIdCounters);
+ return counter;
+ }
+
+ public void metricSet(String prefix, MetricSet set, TopologyContext context) {
+ String baseName = metricName(prefix, context);
+ // Instead of registering the metrics as a set, register them individually.
+ // This allows fetching the individual metrics by type (getTaskGauges())
+ // to work as expected.
+ for (Map.Entry<String, Metric> entry : set.getMetrics().entrySet()) {
+ String metricName = baseName + "." + entry.getKey();
+ Metric metric = registry.register(metricName, entry.getValue());
+ if (metric instanceof Gauge) {
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Gauge) metric, taskIdGauges);
+ } else if (metric instanceof Meter) {
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Meter) metric, taskIdMeters);
+ } else if (metric instanceof Counter) {
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Counter) metric, taskIdCounters);
+ } else if (metric instanceof Timer) {
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Timer) metric, taskIdTimers);
+ } else if (metric instanceof Histogram) {
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Histogram) metric, taskIdHistograms);
+ } else {
+ LOG.error("Unable to save taskId mapping for metric {} named {}", metric, metricName);
+ }
+ }
+ }
+
+ public Timer timer(String name, TopologyContext context) {
+ String metricName = metricName(name, context);
+ Timer timer = registry.timer(metricName);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, timer, taskIdTimers);
+ return timer;
+ }
+
+ public Histogram histogram(String name, TopologyContext context) {
+ String metricName = metricName(name, context);
+ Histogram histogram = registry.histogram(metricName);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricName, histogram, taskIdHistograms);
+ return histogram;
+ }
+
+ private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, String name, T metric, Map<Integer,
+ Map<String, T>> taskIdMetrics) {
+ Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
+ metrics.put(name, metric);
+ }
+
+ private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
+ Map<String, T> ret = new HashMap<>();
+ Map<String, T> taskMetrics = taskIdMetrics.getOrDefault(taskId, Collections.emptyMap());
+ ret.putAll(taskMetrics);
+ return ret;
+ }
+
+ public Map<String, Gauge> getTaskGauges(int taskId) {
+ return getMetricNameMap(taskId, taskIdGauges);
+ }
+
+ public Map<String, Counter> getTaskCounters(int taskId) {
+ return getMetricNameMap(taskId, taskIdCounters);
+ }
+
+ public Map<String, Histogram> getTaskHistograms(int taskId) {
+ return getMetricNameMap(taskId, taskIdHistograms);
+ }
+
+ public Map<String, Meter> getTaskMeters(int taskId) {
+ return getMetricNameMap(taskId, taskIdMeters);
+ }
+
+ public Map<String, Timer> getTaskTimers(int taskId) {
+ return getMetricNameMap(taskId, taskIdTimers);
}
public void start(Map<String, Object> stormConfig, DaemonType type) {
diff --git a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
index de5cbc5..802ec5a 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
@@ -57,5 +57,5 @@ public interface IMetricsContext {
<T> Gauge<T> registerGauge(String name, Gauge<T> gauge);
- MetricSet registerMetricSet(String prefix, MetricSet set);
+ void registerMetricSet(String prefix, MetricSet set);
}
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index b46e981..a79b874 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -402,35 +402,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
@Override
public Timer registerTimer(String name) {
- return metricRegistry.registry().timer(metricName(name));
+ return metricRegistry.timer(name, this);
}
@Override
public Histogram registerHistogram(String name) {
- return metricRegistry.registry().histogram(metricName(name));
+ return metricRegistry.histogram(name, this);
}
@Override
public Meter registerMeter(String name) {
- return metricRegistry.registry().meter(metricName(name));
+ return metricRegistry.meter(name, this);
}
@Override
public Counter registerCounter(String name) {
- return metricRegistry.registry().counter(metricName(name));
+ return metricRegistry.counter(name, this);
}
@Override
public <T> Gauge<T> registerGauge(String name, Gauge<T> gauge) {
- return metricRegistry.registry().register(metricName(name), gauge);
+ return metricRegistry.gauge(name, gauge, this);
}
@Override
- public MetricSet registerMetricSet(String prefix, MetricSet set) {
- return metricRegistry.registry().register(metricName(prefix), set);
- }
-
- private String metricName(String name) {
- return metricRegistry.metricName(name, this);
+ public void registerMetricSet(String prefix, MetricSet set) {
+ metricRegistry.metricSet(prefix, set, this);
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
index b1cc163..e802903 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
@@ -95,7 +95,7 @@ public class TridentOperationContext implements IMetricsContext {
}
@Override
- public MetricSet registerMetricSet(String prefix, MetricSet set) {
- return topoContext.registerMetricSet(prefix, set);
+ public void registerMetricSet(String prefix, MetricSet set) {
+ topoContext.registerMetricSet(prefix, set);
}
}