You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/21 18:30:00 UTC
[storm] branch master updated: STORM-3694 allow reporting V2
metrics with dimensions and short names (#3329)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 39a6fba STORM-3694 allow reporting V2 metrics with dimensions and short names (#3329)
39a6fba is described below
commit 39a6fba632a70f1d4684489b5b4dc6cb98a34662
Author: agresch <ag...@gmail.com>
AuthorDate: Mon Sep 21 13:28:49 2020 -0500
STORM-3694 allow reporting V2 metrics with dimensions and short names (#3329)
---
docs/metrics_v2.md | 6 +-
.../jvm/org/apache/storm/daemon/worker/Worker.java | 2 +-
.../apache/storm/metrics2/DimensionalReporter.java | 97 ++++++++++++++
...rmReporter.java => MetricRegistryProvider.java} | 15 +--
.../apache/storm/metrics2/StormMetricRegistry.java | 146 ++++++++++++++++-----
.../storm/metrics2/TaskMetricDimensions.java | 71 ++++++++++
.../org/apache/storm/metrics2/TaskMetricRepo.java | 94 +++++++++++++
.../metrics2/reporters/ConsoleStormReporter.java | 47 ++++++-
.../metrics2/reporters/ScheduledStormReporter.java | 4 +
.../storm/metrics2/reporters/StormReporter.java | 9 ++
10 files changed, 441 insertions(+), 50 deletions(-)
diff --git a/docs/metrics_v2.md b/docs/metrics_v2.md
index 77aa167..c46459d 100644
--- a/docs/metrics_v2.md
+++ b/docs/metrics_v2.md
@@ -138,6 +138,10 @@ public interface StormMetricsFilter extends MetricFilter {
}
```
+V2 metrics can be reported with a long name (such as storm.topology.mytopologyname-17-1595349167.hostname.__system.-1.6700-memory.pools.Code-Cache.max) or with a short
+name and dimensions (such as memory.pools.Code-Cache.max with dimensions task Id of -1 and component Id of __system) if reporters support this. Each reporter defaults
+to using the long metric name, but can report the short name by configuring report.dimensions.enabled to true for the reporter.
+
## Backwards Compatibility Notes
1. V2 metrics can also be reported to the Metrics Consumers registered with `topology.metrics.consumer.register` by enabling the `topology.enable.v2.metrics.tick` configuration.
@@ -169,4 +173,4 @@ However, the reporters configured with `topology.metrics.reporters` (or `storm.m
duration.unit: "SECONDS"
```
Default values will be used if they are not set or set to `null`.
-
\ No newline at end of file
+
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index eaa19c2..aef4997 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -180,7 +180,7 @@ public class Worker implements Shutdownable, DaemonCommon {
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
- metricRegistry.start(topologyConf);
+ metricRegistry.start(topologyConf, port);
Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java
new file mode 100644
index 0000000..0e6c487
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class that allows using a ScheduledReporter to report V2 task metrics with support for dimensions.
+ * <p></p>
+ * This reporter will be started and scheduled with the MetricRegistry. Once it is called on to report,
+ * it will query the StormMetricRegistry for various sets of task metrics with unique dimensions. The
+ * underlying ScheduledReporter will perform the actual reporting with the help of a DimensionHandler to
+ * deal with the dimensions.
+ */
+public class DimensionalReporter extends ScheduledReporter {
+ private ScheduledReporter underlyingReporter;
+ private MetricRegistryProvider metricRegistryProvider;
+ private MetricFilter filter;
+ private DimensionHandler dimensionHandler;
+
+ /**
+ * Constructor.
+ *
+ * @param metricRegistryProvider MetricRegistryProvider tracking task-specific metrics.
+ * @param unstartedReporter ScheduledReporter to perform the actual reporting. It should NOT be started.
+ * @param dimensionHandler class to handle setting dimensions before reporting a set of metrics.
+ * @param name the reporter's name.
+ * @param filter the filter for which metrics to report.
+ * @param rateUnit rate unit for the reporter.
+ * @param durationUnit duration unit for the reporter.
+ * @param executor the executor to use while scheduling reporting of metrics.
+ * @param shutdownExecutorOnStop if true, then executor will be stopped in same time with this reporter.
+ */
+ public DimensionalReporter(MetricRegistryProvider metricRegistryProvider,
+ ScheduledReporter unstartedReporter,
+ DimensionHandler dimensionHandler,
+ String name,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit,
+ ScheduledExecutorService executor,
+ boolean shutdownExecutorOnStop) {
+ super(metricRegistryProvider.getRegistry(), name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop);
+ underlyingReporter = unstartedReporter;
+ this.metricRegistryProvider = metricRegistryProvider;
+ this.filter = filter;
+ this.dimensionHandler = dimensionHandler;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ report();
+ }
+
+ @Override
+ public void report() {
+ for (Map.Entry<TaskMetricDimensions, TaskMetricRepo> entry : metricRegistryProvider.getTaskMetrics().entrySet()) {
+ TaskMetricRepo repo = entry.getValue();
+ if (dimensionHandler != null) {
+ TaskMetricDimensions dimensions = entry.getKey();
+ dimensionHandler.setDimensions(dimensions.getDimensions());
+ }
+ repo.report(underlyingReporter, filter);
+ }
+ }
+
+ public interface DimensionHandler {
+ /**
+ * Sets dimensions to be used for reporting on the next batch of metrics.
+ *
+ * @param dimensions dimensions valid for use in the next scheduled report.
+ */
+ void setDimensions(Map<String, String> dimensions);
+ }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
similarity index 69%
copy from storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
copy to storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
index 8957608..889b236 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
@@ -10,19 +10,14 @@
* and limitations under the License.
*/
-package org.apache.storm.metrics2.reporters;
+package org.apache.storm.metrics2;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
import java.util.Map;
-public interface StormReporter extends Reporter {
- String REPORT_PERIOD = "report.period";
- String REPORT_PERIOD_UNITS = "report.period.units";
+public interface MetricRegistryProvider {
- void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf, Map<String, Object> reporterConf);
+ MetricRegistry getRegistry();
- void start();
-
- void stop();
-}
\ No newline at end of file
+ Map<TaskMetricDimensions, TaskMetricRepo> getTaskMetrics();
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 43cfcc7..5bd6b92 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -13,6 +13,7 @@
package org.apache.storm.metrics2;
import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
@@ -29,7 +30,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.storm.Config;
-import org.apache.storm.cluster.DaemonType;
import org.apache.storm.metrics2.reporters.StormReporter;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
@@ -38,7 +38,7 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StormMetricRegistry {
+public class StormMetricRegistry implements MetricRegistryProvider {
private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
private static final String WORKER_METRIC_PREFIX = "storm.worker.";
private static final String TOPOLOGY_METRIC_PREFIX = "storm.topology.";
@@ -50,26 +50,30 @@ public class StormMetricRegistry {
private final ConcurrentMap<Integer, Map<String, Counter>> taskIdCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Map<String, Timer>> taskIdTimers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Map<String, Histogram>> taskIdHistograms = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TaskMetricDimensions, TaskMetricRepo> taskMetrics = new ConcurrentHashMap<>();
private String hostName = null;
+ private int port = -1;
+ private String topologyId = null;
public <T> SimpleGauge<T> gauge(
T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
+ Gauge gauge = new SimpleGauge<>(initialValue);
MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
- Gauge gauge = registry.gauge(metricNames.getLongName(), () -> new SimpleGauge<>(initialValue));
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return (SimpleGauge<T>) gauge;
}
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, gauge, taskIdGauges);
return gauge;
}
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return gauge;
}
@@ -77,69 +81,91 @@ public class StormMetricRegistry {
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId,
String streamId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port);
- gauge = registry.register(metricNames.getLongName(), gauge);
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
return gauge;
}
public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(), taskId, componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
return meter;
}
public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(), taskId, componentId, null);
saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
return meter;
}
public Meter meter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
- Meter meter = registry.meter(metricNames.getLongName());
+ Meter meter = registerMeter(metricNames, new Meter(), context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, meter, taskIdMeters);
return meter;
}
public Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
return counter;
}
public Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) {
MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, workerPort);
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, streamId);
saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
return counter;
}
public Counter counter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
- Counter counter = registry.counter(metricNames.getLongName());
+ Counter counter = registerCounter(metricNames, new Counter(), context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, counter, taskIdCounters);
return counter;
}
+ public Timer timer(String name, TopologyContext context) {
+ MetricNames metricNames = topologyMetricName(name, context);
+ Timer timer = registerTimer(metricNames, new Timer(), context.getThisTaskId(), context.getThisComponentId(), null);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, timer, taskIdTimers);
+ return timer;
+ }
+
+ public Histogram histogram(String name, TopologyContext context) {
+ MetricNames metricNames = topologyMetricName(name, context);
+ Histogram histogram = registerHistogram(metricNames, new Histogram(new ExponentiallyDecayingReservoir()),
+ context.getThisTaskId(), context.getThisComponentId(), null);
+ saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, histogram, taskIdHistograms);
+ return histogram;
+ }
+
public void metricSet(String prefix, MetricSet set, TopologyContext context) {
// Instead of registering the metrics as a set, register them individually.
// This allows fetching the individual metrics by type (getTaskGauges())
// to work as expected.
for (Map.Entry<String, Metric> entry : set.getMetrics().entrySet()) {
MetricNames metricNames = topologyMetricName(prefix + "." + entry.getKey(), context);
- Metric metric = registry.register(metricNames.getLongName(), entry.getValue());
+ Metric metric = entry.getValue();
if (metric instanceof Gauge) {
+ registerGauge(metricNames, (Gauge) metric, context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Gauge) metric, taskIdGauges);
} else if (metric instanceof Meter) {
+ registerMeter(metricNames, (Meter) metric, context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Meter) metric, taskIdMeters);
} else if (metric instanceof Counter) {
+ registerCounter(metricNames, (Counter) metric, context.getThisTaskId(),
+ context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Counter) metric, taskIdCounters);
} else if (metric instanceof Timer) {
+ registerTimer(metricNames, (Timer) metric, context.getThisTaskId(), context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Timer) metric, taskIdTimers);
} else if (metric instanceof Histogram) {
+ registerHistogram(metricNames, (Histogram) metric, context.getThisTaskId(),
+ context.getThisComponentId(), null);
saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Histogram) metric, taskIdHistograms);
} else {
LOG.error("Unable to save taskId mapping for metric {} named {}", metric, metricNames.getLongName());
@@ -147,28 +173,53 @@ public class StormMetricRegistry {
}
}
- public Timer timer(String name, TopologyContext context) {
- MetricNames metricNames = topologyMetricName(name, context);
- Timer timer = registry.timer(metricNames.getLongName());
- saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, timer, taskIdTimers);
- return timer;
+ private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, MetricNames names, T metric, Map<Integer,
+ Map<String, T>> taskIdMetrics) {
+ Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
+ metrics.put(names.getShortName(), metric);
}
- public Histogram histogram(String name, TopologyContext context) {
- MetricNames metricNames = topologyMetricName(name, context);
- Histogram histogram = registry.histogram(metricNames.getLongName());
- saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, histogram, taskIdHistograms);
- return histogram;
+ private <T> Gauge<T> registerGauge(MetricNames metricNames, Gauge<T> gauge, int taskId,
+ String componentId, String streamId) {
+ TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+ TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+ repo.addGauge(metricNames.getShortName(), gauge);
+ gauge = registry.register(metricNames.getLongName(), gauge);
+ return gauge;
}
- private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, MetricNames names, T metric, Map<Integer,
- Map<String, T>> taskIdMetrics) {
- Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
- if (metrics.get(names.getV2TickName()) != null) {
- LOG.warn("Adding duplicate short metric for {} with long name {}, only the last metric "
- + "will be reported during the V2 metrics tick.", names.getV2TickName(), names.longName);
- }
- metrics.put(names.getV2TickName(), metric);
+ private Meter registerMeter(MetricNames metricNames, Meter meter, int taskId, String componentId, String streamId) {
+ TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+ TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+ repo.addMeter(metricNames.getShortName(), meter);
+ meter = registry.register(metricNames.getLongName(), meter);
+ return meter;
+ }
+
+ private Counter registerCounter(MetricNames metricNames, Counter counter, int taskId,
+ String componentId, String streamId) {
+ TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+ TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+ repo.addCounter(metricNames.getShortName(), counter);
+ counter = registry.register(metricNames.getLongName(), counter);
+ return counter;
+ }
+
+ private Timer registerTimer(MetricNames metricNames, Timer timer, int taskId, String componentId, String streamId) {
+ TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+ TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+ repo.addTimer(metricNames.getShortName(), timer);
+ timer = registry.register(metricNames.getLongName(), timer);
+ return timer;
+ }
+
+ private Histogram registerHistogram(MetricNames metricNames, Histogram histogram, int taskId,
+ String componentId, String streamId) {
+ TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+ TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+ repo.addHistogram(metricNames.getShortName(), histogram);
+ histogram = registry.register(metricNames.getLongName(), histogram);
+ return histogram;
}
private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
@@ -198,7 +249,7 @@ public class StormMetricRegistry {
return getMetricNameMap(taskId, taskIdTimers);
}
- public void start(Map<String, Object> topoConf) {
+ public void start(Map<String, Object> topoConf, int port) {
try {
hostName = dotToUnderScore(Utils.localHostname());
} catch (UnknownHostException e) {
@@ -206,6 +257,9 @@ public class StormMetricRegistry {
+ " as 'localhost'.");
}
+ this.topologyId = (String) topoConf.get(Config.STORM_ID);
+ this.port = port;
+
LOG.info("Starting metrics reporters...");
List<Map<String, Object>> reporterList = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_METRICS_REPORTERS);
@@ -221,7 +275,7 @@ public class StormMetricRegistry {
LOG.info("Attempting to instantiate reporter class: {}", clazz);
StormReporter reporter = ReflectionUtils.newInstance(clazz);
if (reporter != null) {
- reporter.prepare(registry, topoConf, reporterConfig);
+ reporter.prepare(this, topoConf, reporterConfig);
reporter.start();
reporters.add(reporter);
}
@@ -234,6 +288,26 @@ public class StormMetricRegistry {
}
}
+ public MetricRegistry getRegistry() {
+ return registry;
+ }
+
+ public Map<TaskMetricDimensions, TaskMetricRepo> getTaskMetrics() {
+ return taskMetrics;
+ }
+
+ String getHostName() {
+ return hostName;
+ }
+
+ String getTopologyId() {
+ return topologyId;
+ }
+
+ Integer getPort() {
+ return port;
+ }
+
private MetricNames workerMetricName(String name, String stormId, String componentId, String streamId,
Integer taskId, Integer workerPort) {
StringBuilder sb = new StringBuilder(WORKER_METRIC_PREFIX);
@@ -313,10 +387,10 @@ public class StormMetricRegistry {
}
/**
- * Returns the short metric name to be used for reporting during the V2 metrics tick.
- * @return The V2 metrics tick name.
+ * Returns the short metric name (without dimensions).
+ * @return The short metric name.
*/
- String getV2TickName() {
+ String getShortName() {
return shortName;
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java
new file mode 100644
index 0000000..20c4389
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class to store task-related V2 metrics dimensions.
+ */
+public class TaskMetricDimensions {
+ private int taskId;
+ private String componentId;
+ private String streamId;
+ private Map<String, String> dimensions = new HashMap<>();
+
+ public TaskMetricDimensions(int taskId, String componentId, String streamId, StormMetricRegistry metricRegistry) {
+ this.taskId = taskId;
+ dimensions.put("taskid", Integer.toString(this.taskId));
+
+ this.componentId = componentId;
+ if (this.componentId == null) {
+ this.componentId = "";
+ } else {
+ dimensions.put("componentId", this.componentId);
+ }
+
+ this.streamId = streamId;
+ if (this.streamId == null) {
+ this.streamId = "";
+ } else {
+ dimensions.put("streamId", this.streamId);
+ }
+
+ dimensions.put("hostname", metricRegistry.getHostName());
+ dimensions.put("topologyId", metricRegistry.getTopologyId());
+ dimensions.put("port", metricRegistry.getPort().toString());
+ }
+
+ public Map<String, String> getDimensions() {
+ return dimensions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskMetricDimensions otherD = (TaskMetricDimensions) o;
+ return (taskId == otherD.taskId && componentId.equals(otherD.componentId)
+ && streamId.equals(otherD.streamId));
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId + componentId.hashCode() + streamId.hashCode();
+ }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
new file mode 100644
index 0000000..e88b64f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Metric repository to allow reporting of task-specific metrics.
+ */
+public class TaskMetricRepo {
+ private SortedMap<String, Gauge> gauges = new TreeMap<>();
+ private SortedMap<String, Counter> counters = new TreeMap<>();
+ private SortedMap<String, Histogram> histograms = new TreeMap<>();
+ private SortedMap<String, Meter> meters = new TreeMap<>();
+ private SortedMap<String, Timer> timers = new TreeMap<>();
+
+ public void addCounter(String name, Counter counter) {
+ counters.put(name, counter);
+ }
+
+ public void addGauge(String name, Gauge gauge) {
+ gauges.put(name, gauge);
+ }
+
+ public void addMeter(String name, Meter meter) {
+ meters.put(name, meter);
+ }
+
+ public void addHistogram(String name, Histogram histogram) {
+ histograms.put(name, histogram);
+ }
+
+ public void addTimer(String name, Timer timer) {
+ timers.put(name, timer);
+ }
+
+ public void report(ScheduledReporter reporter, MetricFilter filter) {
+ if (filter != null) {
+ SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
+ SortedMap<String, Counter> filteredCounters = new TreeMap<>();
+ SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
+ SortedMap<String, Meter> filteredMeters = new TreeMap<>();
+ SortedMap<String, Timer> filteredTimers = new TreeMap<>();
+
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredGauges.put(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredCounters.put(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredHistograms.put(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredMeters.put(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredTimers.put(entry.getKey(), entry.getValue());
+ }
+ }
+ reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
+ } else {
+ reporter.report(gauges, counters, histograms, meters, timers);
+ }
+ }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
index ffd278b..bfa84fb 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -14,19 +14,35 @@ package org.apache.storm.metrics2.reporters;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.DimensionalReporter;
+import org.apache.storm.metrics2.MetricRegistryProvider;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConsoleStormReporter extends ScheduledStormReporter {
+public class ConsoleStormReporter extends ScheduledStormReporter implements DimensionalReporter.DimensionHandler {
private static final Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
@Override
public void prepare(MetricRegistry registry, Map<String, Object> topoConf, Map<String, Object> reporterConf) {
+ init(registry, null, reporterConf);
+ }
+
+ @Override
+ public void prepare(MetricRegistryProvider metricRegistryProvider, Map<String, Object> topoConf,
+ Map<String, Object> reporterConf) {
+ init(metricRegistryProvider.getRegistry(), metricRegistryProvider, reporterConf);
+ }
+
+ private void init(MetricRegistry registry, MetricRegistryProvider metricRegistryProvider, Map<String, Object> reporterConf) {
LOG.debug("Preparing ConsoleReporter");
ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry);
@@ -57,7 +73,34 @@ public class ConsoleStormReporter extends ScheduledStormReporter {
//defaults to seconds
reportingPeriodUnit = getReportPeriodUnit(reporterConf);
- reporter = builder.build();
+ ScheduledReporter consoleReporter = builder.build();
+
+ boolean reportDimensions = isReportDimensionsEnabled(reporterConf);
+ if (reportDimensions) {
+ if (metricRegistryProvider == null) {
+ throw new RuntimeException("MetricRegistryProvider is required to enable reporting dimensions");
+ }
+ if (rateUnit == null) {
+ rateUnit = TimeUnit.SECONDS;
+ }
+ if (durationUnit == null) {
+ durationUnit = TimeUnit.MILLISECONDS;
+ }
+ DimensionalReporter dimensionalReporter = new DimensionalReporter(metricRegistryProvider, consoleReporter, this,
+ "ConsoleDimensionalReporter",
+ filter, rateUnit, durationUnit, null, true);
+ reporter = dimensionalReporter;
+ } else {
+ reporter = consoleReporter;
+ }
}
+ // We're unable to extend ConsoleReporter to handle dimensions, so we'll report dimensions here
+ @Override
+ public void setDimensions(Map<String, String> dimensions) {
+ System.out.println("Using dimensions: ");
+ for (Map.Entry<String, String> entry : dimensions.entrySet()) {
+ System.out.println(entry.getKey() + " : " + entry.getValue());
+ }
+ }
}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
index 0fe5c7d..3a7c990 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -37,6 +37,10 @@ public abstract class ScheduledStormReporter implements StormReporter {
return ObjectReader.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
}
+ public static boolean isReportDimensionsEnabled(Map<String, Object> reporterConf) {
+ return ObjectReader.getBoolean(reporterConf.get(REPORT_DIMENSIONS_ENABLED), false);
+ }
+
public static StormMetricsFilter getMetricsFilter(Map<String, Object> reporterConf) {
StormMetricsFilter filter = null;
Map<String, Object> filterConf = (Map<String, Object>) reporterConf.get("filter");
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
index 8957608..4e81667 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -15,13 +15,22 @@ package org.apache.storm.metrics2.reporters;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Reporter;
import java.util.Map;
+import org.apache.storm.metrics2.MetricRegistryProvider;
+
public interface StormReporter extends Reporter {
String REPORT_PERIOD = "report.period";
String REPORT_PERIOD_UNITS = "report.period.units";
+ String REPORT_DIMENSIONS_ENABLED = "report.dimensions.enabled";
+ @Deprecated
void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf, Map<String, Object> reporterConf);
+ default void prepare(MetricRegistryProvider metricRegistryProvider, Map<String, Object> topoConf,
+ Map<String, Object> reporterConf) {
+ prepare(metricRegistryProvider.getRegistry(), topoConf, reporterConf);
+ }
+
void start();
void stop();