You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/04/28 18:47:14 UTC

[storm] branch master updated: STORM-3623 executors should only report their metrics for V2 metric tick

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 6976624  STORM-3623 executors should only report their metrics for V2 metric tick
     new 38d714b  Merge pull request #3251 from agresch/agresch_storm_3623
6976624 is described below

commit 6976624228386c25e9ac6dd2b8ef87a9deeb1ad9
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Fri Apr 17 09:53:05 2020 -0500

    STORM-3623 executors should only report their metrics for V2 metric tick
---
 .../jvm/org/apache/storm/executor/Executor.java    |  74 +++++++-----
 .../apache/storm/metrics2/StormMetricRegistry.java | 131 +++++++++++++++++++--
 .../jvm/org/apache/storm/task/IMetricsContext.java |   2 +-
 .../jvm/org/apache/storm/task/TopologyContext.java |  18 ++-
 .../trident/operation/TridentOperationContext.java |   4 +-
 5 files changed, 174 insertions(+), 55 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index bc7ce20..6d5b9f2 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -63,7 +63,6 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -317,7 +316,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
                     }
                 }
             }
-            addV2Metrics(dataPoints);
+            addV2Metrics(taskId, dataPoints);
 
             if (!dataPoints.isEmpty()) {
                 IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
@@ -333,51 +332,66 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     }
 
     // updates v1 metric dataPoints with v2 metric API data
-    private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
+    private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
         boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
         if (!enableV2MetricsDataPoints) {
             return;
         }
-        StormMetricRegistry stormMetricRegistry = workerData.getMetricRegistry();
-        for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
-            String name = entry.getKey();
+        processGauges(taskId, dataPoints);
+        processCounters(taskId, dataPoints);
+        processHistograms(taskId, dataPoints);
+        processMeters(taskId, dataPoints);
+        processTimers(taskId, dataPoints);
+    }
+
+    private void processGauges(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+        Map<String, Gauge> gauges = workerData.getMetricRegistry().getTaskGauges(taskId);
+        for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
             Object v = entry.getValue().getValue();
             if (v instanceof Number) {
-                IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
+                IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), v);
                 dataPoints.add(dataPoint);
-            } else {
-                LOG.warn("Cannot report {}, its value is not a Number {}", name, v);
             }
         }
-        for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
+    }
+
+    private void processCounters(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+        Map<String, Counter> counters = workerData.getMetricRegistry().getTaskCounters(taskId);
+        for (Map.Entry<String, Counter> entry : counters.entrySet()) {
             Object value = entry.getValue().getCount();
             IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
             dataPoints.add(dataPoint);
         }
-        for (Map.Entry<String, Histogram> entry: stormMetricRegistry.registry().getHistograms().entrySet()) {
-            String baseName = entry.getKey();
-            Histogram histogram = entry.getValue();
-            Snapshot snapshot =  histogram.getSnapshot();
-            addSnapshotDatapoints(baseName, snapshot, dataPoints);
-            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", histogram.getCount());
+    }
+
+    private void processHistograms(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+        Map<String, Histogram> histograms = workerData.getMetricRegistry().getTaskHistograms(taskId);
+        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+            Snapshot snapshot =  entry.getValue().getSnapshot();
+            addSnapshotDatapoints(entry.getKey(), snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
             dataPoints.add(dataPoint);
         }
-        for (Map.Entry<String, Meter> entry: stormMetricRegistry.registry().getMeters().entrySet()) {
-            String baseName = entry.getKey();
-            Meter meter = entry.getValue();
-            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", meter.getCount());
+    }
+
+    private void processMeters(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+        Map<String, Meter> meters = workerData.getMetricRegistry().getTaskMeters(taskId);
+        for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
             dataPoints.add(dataPoint);
-            addConvertedMetric(baseName, ".m1_rate", meter.getOneMinuteRate(), dataPoints);
-            addConvertedMetric(baseName, ".m5_rate", meter.getFiveMinuteRate(), dataPoints);
-            addConvertedMetric(baseName, ".m15_rate", meter.getFifteenMinuteRate(), dataPoints);
-            addConvertedMetric(baseName, ".mean_rate", meter.getMeanRate(), dataPoints);
+            addConvertedMetric(entry.getKey(), ".m1_rate", entry.getValue().getOneMinuteRate(), dataPoints);
+            addConvertedMetric(entry.getKey(), ".m5_rate", entry.getValue().getFiveMinuteRate(), dataPoints);
+            addConvertedMetric(entry.getKey(), ".m15_rate", entry.getValue().getFifteenMinuteRate(), dataPoints);
+            addConvertedMetric(entry.getKey(), ".mean_rate", entry.getValue().getMeanRate(), dataPoints);
         }
-        for (Map.Entry<String, Timer> entry : stormMetricRegistry.registry().getTimers().entrySet()) {
-            String baseName = entry.getKey();
-            Timer timer = entry.getValue();
-            Snapshot snapshot =  timer.getSnapshot();
-            addSnapshotDatapoints(baseName, snapshot, dataPoints);
-            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", timer.getCount());
+    }
+
+    private void processTimers(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
+        Map<String, Timer> timers = workerData.getMetricRegistry().getTaskTimers(taskId);
+        for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+            Snapshot snapshot =  entry.getValue().getSnapshot();
+            addSnapshotDatapoints(entry.getKey(), snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey() + ".count", entry.getValue().getCount());
             dataPoints.add(dataPoint);
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index df064a0..6501a4f 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -13,12 +13,21 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.Config;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.metrics2.reporters.StormReporter;
@@ -35,38 +44,138 @@ public class StormMetricRegistry {
     
     private final MetricRegistry registry = new MetricRegistry();
     private final List<StormReporter> reporters = new ArrayList<>();
+    private final ConcurrentMap<Integer, Map<String, Gauge>> taskIdGauges = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, Map<String, Meter>> taskIdMeters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, Map<String, Counter>> taskIdCounters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, Map<String, Timer>> taskIdTimers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, Map<String, Histogram>> taskIdHistograms = new ConcurrentHashMap<>();
     private String hostName = null;
 
     public <T> SimpleGauge<T> gauge(
         T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
         String metricName = metricName(name, topologyId, componentId, taskId, port);
-        return (SimpleGauge<T>) registry.gauge(metricName, () -> new SimpleGauge<>(initialValue));
+        Gauge gauge = registry.gauge(metricName, () -> new SimpleGauge<>(initialValue));
+        saveMetricTaskIdMapping(taskId, metricName, gauge, taskIdGauges);
+        return (SimpleGauge<T>) gauge;
+    }
+
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext context) {
+        String metricName = metricName(name, context);
+        gauge = registry.register(metricName, gauge);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, gauge, taskIdGauges);
+        return gauge;
     }
 
     public JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) {
-        return new JcMetrics(
-            gauge(0L, name + "-capacity", topologyId, componentId, taskId, port),
-            gauge(0L, name + "-population", topologyId, componentId, taskId, port)
-        );
+        SimpleGauge<Long> capacityGauge = gauge(0L, name + "-capacity", topologyId, componentId, taskId, port);
+        SimpleGauge<Long> populationGauge = gauge(0L, name + "-population", topologyId, componentId, taskId, port);
+        return new JcMetrics(capacityGauge, populationGauge);
     }
 
     public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
         String metricName = metricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
-        return registry.meter(metricName);
+        Meter meter = registry.meter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, meter, taskIdMeters);
+        return meter;
+    }
+
+    public Meter meter(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Meter meter = registry.meter(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, meter, taskIdMeters);
+        return meter;
     }
 
     public Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
         String metricName = metricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
-        return registry.counter(metricName);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, counter, taskIdCounters);
+        return counter;
     }
 
     public Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) {
         String metricName = metricName(name, topologyId, componentId, streamId, taskId, workerPort);
-        return registry.counter(metricName);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(taskId, metricName, counter, taskIdCounters);
+        return counter;
     }
-    
-    public MetricRegistry registry() {
-        return registry;
+
+    public Counter counter(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Counter counter = registry.counter(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, counter, taskIdCounters);
+        return counter;
+    }
+
+    public void metricSet(String prefix, MetricSet set, TopologyContext context) {
+        String baseName = metricName(prefix, context);
+        // Instead of registering the metrics as a set, register them individually.
+        // This allows fetching the individual metrics by type (getTaskGauges())
+        // to work as expected.
+        for (Map.Entry<String, Metric> entry : set.getMetrics().entrySet()) {
+            String metricName = baseName + "." + entry.getKey();
+            Metric metric = registry.register(metricName, entry.getValue());
+            if (metric instanceof Gauge) {
+                saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Gauge) metric, taskIdGauges);
+            } else if (metric instanceof Meter) {
+                saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Meter) metric, taskIdMeters);
+            } else if (metric instanceof Counter) {
+                saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Counter) metric, taskIdCounters);
+            } else if (metric instanceof Timer) {
+                saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Timer) metric, taskIdTimers);
+            } else if (metric instanceof Histogram) {
+                saveMetricTaskIdMapping(context.getThisTaskId(), metricName, (Histogram) metric, taskIdHistograms);
+            } else {
+                LOG.error("Unable to save taskId mapping for metric {} named {}", metric, metricName);
+            }
+        }
+    }
+
+    public Timer timer(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Timer timer = registry.timer(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, timer, taskIdTimers);
+        return timer;
+    }
+
+    public Histogram histogram(String name, TopologyContext context) {
+        String metricName = metricName(name, context);
+        Histogram histogram = registry.histogram(metricName);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricName, histogram, taskIdHistograms);
+        return histogram;
+    }
+
+    private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, String name, T metric, Map<Integer,
+            Map<String, T>> taskIdMetrics) {
+        Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
+        metrics.put(name, metric);
+    }
+
+    private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
+        Map<String, T> ret = new HashMap<>();
+        Map<String, T> taskMetrics = taskIdMetrics.getOrDefault(taskId, Collections.emptyMap());
+        ret.putAll(taskMetrics);
+        return ret;
+    }
+
+    public Map<String, Gauge> getTaskGauges(int taskId) {
+        return getMetricNameMap(taskId, taskIdGauges);
+    }
+
+    public Map<String, Counter> getTaskCounters(int taskId) {
+        return getMetricNameMap(taskId, taskIdCounters);
+    }
+
+    public Map<String, Histogram> getTaskHistograms(int taskId) {
+        return getMetricNameMap(taskId, taskIdHistograms);
+    }
+
+    public Map<String, Meter> getTaskMeters(int taskId) {
+        return getMetricNameMap(taskId, taskIdMeters);
+    }
+
+    public Map<String, Timer> getTaskTimers(int taskId) {
+        return getMetricNameMap(taskId, taskIdTimers);
     }
 
     public void start(Map<String, Object> stormConfig, DaemonType type) {
diff --git a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
index de5cbc5..802ec5a 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IMetricsContext.java
@@ -57,5 +57,5 @@ public interface IMetricsContext {
 
     <T> Gauge<T> registerGauge(String name, Gauge<T> gauge);
 
-    MetricSet registerMetricSet(String prefix, MetricSet set);
+    void registerMetricSet(String prefix, MetricSet set);
 }
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index b46e981..a79b874 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -402,35 +402,31 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
 
     @Override
     public Timer registerTimer(String name) {
-        return metricRegistry.registry().timer(metricName(name));
+        return metricRegistry.timer(name, this);
     }
 
     @Override
     public Histogram registerHistogram(String name) {
-        return metricRegistry.registry().histogram(metricName(name));
+        return metricRegistry.histogram(name, this);
     }
 
     @Override
     public Meter registerMeter(String name) {
-        return metricRegistry.registry().meter(metricName(name));
+        return metricRegistry.meter(name, this);
     }
 
     @Override
     public Counter registerCounter(String name) {
-        return metricRegistry.registry().counter(metricName(name));
+        return metricRegistry.counter(name, this);
     }
 
     @Override
     public <T> Gauge<T> registerGauge(String name, Gauge<T> gauge) {
-        return metricRegistry.registry().register(metricName(name), gauge);
+        return metricRegistry.gauge(name, gauge, this);
     }
 
     @Override
-    public MetricSet registerMetricSet(String prefix, MetricSet set) {
-        return metricRegistry.registry().register(metricName(prefix), set);
-    }
-
-    private String metricName(String name) {
-        return metricRegistry.metricName(name, this);
+    public void registerMetricSet(String prefix, MetricSet set) {
+        metricRegistry.metricSet(prefix, set, this);
     }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
index b1cc163..e802903 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
@@ -95,7 +95,7 @@ public class TridentOperationContext implements IMetricsContext {
     }
 
     @Override
-    public MetricSet registerMetricSet(String prefix, MetricSet set) {
-        return topoContext.registerMetricSet(prefix, set);
+    public void registerMetricSet(String prefix, MetricSet set) {
+        topoContext.registerMetricSet(prefix, set);
     }
 }