You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/01/19 17:07:56 UTC
[storm] branch master updated: STORM-3682 Upgrade netty client
metrics to use V2 API (#3371)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 3cc4797 STORM-3682 Upgrade netty client metrics to use V2 API (#3371)
3cc4797 is described below
commit 3cc4797e56cf30dc04c8c8dcf133534522cc87eb
Author: agresch <ag...@gmail.com>
AuthorDate: Tue Jan 19 11:06:56 2021 -0600
STORM-3682 Upgrade netty client metrics to use V2 API (#3371)
* STORM-3682 Upgrade netty client metrics to use V2 API
---
docs/Metrics.md | 20 +-----
.../storm/daemon/metrics/BuiltinMetricsUtil.java | 23 -------
.../apache/storm/daemon/worker/WorkerState.java | 3 +-
.../apache/storm/executor/bolt/BoltExecutor.java | 2 -
.../jvm/org/apache/storm/messaging/IContext.java | 12 ++++
.../apache/storm/messaging/TransportFactory.java | 6 +-
.../org/apache/storm/messaging/netty/Client.java | 78 ++++++++++++++++------
.../org/apache/storm/messaging/netty/Context.java | 10 ++-
.../jvm/org/apache/storm/metrics2/RateCounter.java | 4 ++
.../apache/storm/metrics2/StormMetricRegistry.java | 45 +++++++++++++
.../org/apache/storm/metrics2/TaskMetricRepo.java | 77 +++++++++++----------
.../apache/storm/messaging/netty/NettyTest.java | 14 ++--
12 files changed, 188 insertions(+), 106 deletions(-)
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 561f2f0..9d21f50 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -268,25 +268,11 @@ Be aware that the `__system` bolt is an actual bolt so regular bolt metrics desc
##### Send (Netty Client)
-The `__send-iconnection` metric holds information about all of the clients for this worker. It is of the form
+The `__send-iconnection` metrics report information about all of the clients for this worker. They are named __send-iconnection-METRIC_TYPE-HOST:PORT for a given Client that is
+connected to a worker with the given host/port.
-```
-{
- NodeInfo(node:7decee4b-c314-41f4-b362-fd1358c985b3-127.0.01, port:[6701]): {
- "reconnects": 0,
- "src": "/127.0.0.1:49951",
- "pending": 0,
- "dest": "localhost/127.0.0.1:6701",
- "sent": 420779,
- "lostOnSend": 0
- }
-}
-```
-
-The value is a map where the key is a NodeInfo class for the downstream worker it is sending messages to. This is the SupervisorId + port. The value is another map with the fields
+The metric types reported for each client are:
- * `src` What host/port this client has used to connect to the receiving worker.
- * `dest` What host/port this client has connected to.
* `reconnects` the number of reconnections that have happened.
* `pending` the number of messages that have not been sent. (This corresponds to messages, not tuples)
* `sent` the number of messages that have been sent. (This is messages not tuples)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 72d660f..828d7ea 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -12,16 +12,12 @@
package org.apache.storm.daemon.metrics;
-import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
-import org.apache.storm.generated.NodeInfo;
-import org.apache.storm.messaging.IConnection;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.api.StateMetric;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.utils.JCQueue;
public class BuiltinMetricsUtil {
public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
@@ -30,25 +26,6 @@ public class BuiltinMetricsUtil {
}
}
- public static void registerIconnectionClientMetrics(final Map<NodeInfo, IConnection> nodePortToSocket, Map<String, Object> topoConf,
- TopologyContext context) {
- IMetric metric = new IMetric() {
- @Override
- public Object getValueAndReset() {
- Map<Object, Object> ret = new HashMap<>();
- for (Map.Entry<NodeInfo, IConnection> entry : nodePortToSocket.entrySet()) {
- NodeInfo nodePort = entry.getKey();
- IConnection connection = entry.getValue();
- if (connection instanceof IStatefulObject) {
- ret.put(nodePort, ((IStatefulObject) connection).getState());
- }
- }
- return ret;
- }
- };
- registerMetric("__send-iconnection", metric, topoConf, context);
- }
-
public static void registerMetric(String name, IMetric metric, Map<String, Object> topoConf, TopologyContext context) {
int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
context.registerMetric(name, metric, bucketSize);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index ab4a8b6..f7aa451 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -174,7 +174,8 @@ public class WorkerState {
this.credentialsAtom = new AtomicReference(initialCredentials);
this.conf = conf;
this.supervisorIfaceSupplier = supervisorIfaceSupplier;
- this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
+ this.mqContext = (null != mqContext) ? mqContext :
+ TransportFactory.makeContext(topologyConf, metricRegistry);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
this.port = port;
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index ddd830d..293d756 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -114,8 +114,6 @@ public class BoltExecutor extends Executor {
((ICredentialsListener) boltObject).setCredentials(credentials);
}
if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
- Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
- BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
// add any autocredential expiry metrics from the worker
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index ac56a8a..8b1183b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import org.apache.storm.metrics2.StormMetricRegistry;
/**
* This interface needs to be implemented for messaging plugin.
@@ -30,9 +31,20 @@ public interface IContext {
*
* @param topoConf storm configuration
*/
+ @Deprecated
void prepare(Map<String, Object> topoConf);
/**
+ * This method is invoked at the startup of messaging plugin.
+ *
+ * @param topoConf storm configuration
+ * @param metricRegistry storm metric registry
+ */
+ default void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
+ prepare(topoConf);
+ }
+
+ /**
* This method is invoked when a worker is unloading a messaging plugin.
*/
void term();
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
index cc48eca..fde5344 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
@@ -15,13 +15,14 @@ package org.apache.storm.messaging;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.storm.Config;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransportFactory {
public static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
- public static IContext makeContext(Map<String, Object> topoConf) {
+ public static IContext makeContext(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
//get factory class name
String transportPluginClassName = (String) topoConf.get(Config.STORM_MESSAGING_TRANSPORT);
@@ -37,9 +38,10 @@ public class TransportFactory {
//case 1: plugin is a IContext class
transport = (IContext) obj;
//initialize with storm configuration
- transport.prepare(topoConf);
+ transport.prepare(topoConf, metricRegistry);
} else {
//case 2: Non-IContext plugin must have a makeContext(topoConf) method that returns IContext object
+ // StormMetricRegistry is ignored if IContext is created this way
Method method = klass.getMethod("makeContext", Map.class);
LOG.debug("object:" + obj + " method:" + method);
transport = (IContext) method.invoke(obj, topoConf);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 5819924..9f54fbd 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -14,13 +14,17 @@ package org.apache.storm.messaging.netty;
import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,10 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
+import org.apache.storm.Constants;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.TaskMessage;
-import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.IWaitStrategy.WaitSituation;
import org.apache.storm.policy.WaitStrategyProgressive;
@@ -63,7 +68,7 @@ import org.slf4j.LoggerFactory;
* asynchronously. Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
* destination is currently unavailable.
*/
-public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
+public class Client extends ConnectionWithStatus implements ISaslClient {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
/**
@@ -119,10 +124,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
* This flag is set to true if and only if a client instance is being closed.
*/
private volatile boolean closing = false;
+ StormMetricRegistry metricRegistry;
+ private Set<Metric> metrics = new HashSet<>();
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
- int port) {
+ int port, StormMetricRegistry metricRegistry) {
this.topoConf = topoConf;
closing = false;
this.scheduler = scheduler;
@@ -163,6 +170,50 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
waitStrategy = ReflectionUtils.newInstance(clazz);
}
waitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT);
+ this.metricRegistry = metricRegistry;
+
+ // it's possible to be passed a null metric registry if users are using their own IContext implementation.
+ if (this.metricRegistry != null) {
+ Gauge<Integer> reconnects = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return totalConnectionAttempts.get();
+ }
+ };
+ metricRegistry.gauge("__send-iconnection-reconnects-" + host + ":" + port, reconnects,
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+ metrics.add(reconnects);
+
+ Gauge<Integer> sent = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return messagesSent.get();
+ }
+ };
+ metricRegistry.gauge("__send-iconnection-sent-" + host + ":" + port, sent,
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+ metrics.add(sent);
+
+ Gauge<Long> pending = new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return pendingMessages.get();
+ }
+ };
+ metricRegistry.gauge("__send-iconnection-pending-" + host + ":" + port, pending,
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+ metrics.add(pending);
+
+ Gauge<Integer> lostOnSend = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return messagesLost.get();
+ }
+ };
+ metricRegistry.gauge("__send-iconnection-lostOnSend-" + host + ":" + port, lostOnSend,
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+ metrics.add(lostOnSend);
+ }
}
/**
@@ -415,6 +466,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
closing = true;
waitForPendingMessagesToBeSent();
closeChannel();
+
+ // stop tracking metrics for this client
+ if (this.metricRegistry != null) {
+ this.metricRegistry.deregister(this.metrics);
+ }
}
}
@@ -467,22 +523,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
return ret;
}
- @Override
- public Object getState() {
- LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
- HashMap<String, Object> ret = new HashMap<String, Object>();
- ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
- ret.put("sent", messagesSent.getAndSet(0));
- ret.put("pending", pendingMessages.get());
- ret.put("lostOnSend", messagesLost.getAndSet(0));
- ret.put("dest", dstAddress.toString());
- String src = srcAddressName();
- if (src != null) {
- ret.put("src", src);
- }
- return ret;
- }
-
public Map<String, Object> getConfig() {
return topoConf;
}
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 03feaf8..a1384cb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -22,6 +22,7 @@ import org.apache.storm.Config;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
@@ -32,12 +33,18 @@ public class Context implements IContext {
private List<Server> serverConnections;
private EventLoopGroup workerEventLoopGroup;
private HashedWheelTimer clientScheduleService;
+ private StormMetricRegistry metricRegistry = null;
/**
* initialization per Storm configuration.
*/
@Override
public void prepare(Map<String, Object> topoConf) {
+ prepare(topoConf, null);
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
this.topoConf = topoConf;
serverConnections = new ArrayList<>();
@@ -49,6 +56,7 @@ public class Context implements IContext {
this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
+ this.metricRegistry = metricRegistry;
}
/**
@@ -67,7 +75,7 @@ public class Context implements IContext {
@Override
public IConnection connect(String stormId, String host, int port, AtomicBoolean[] remoteBpStatus) {
return new Client(topoConf, remoteBpStatus, workerEventLoopGroup,
- clientScheduleService, host, port);
+ clientScheduleService, host, port, metricRegistry);
}
/**
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
index 77b6720..6f5c32c 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -59,4 +59,8 @@ public class RateCounter implements Gauge<Double> {
values[time] = counter.getCount();
currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
}
+
+ Counter getCounter() {
+ return counter;
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 6863503..3ea7a69 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -18,6 +18,7 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
@@ -25,6 +26,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,6 +86,7 @@ public class StormMetricRegistry implements MetricRegistryProvider {
return gauge;
}
+ @Deprecated
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
@@ -91,6 +94,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
return gauge;
}
+ public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String componentId, Integer taskId) {
+ MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+ gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
+ saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
+ return gauge;
+ }
+
public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId,
String streamId, Integer taskId, Integer port) {
MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port);
@@ -235,6 +245,14 @@ public class StormMetricRegistry implements MetricRegistryProvider {
return histogram;
}
+ public void deregister(Set<Metric> toRemove) {
+ MetricFilter metricFilter = new RemoveMetricFilter(toRemove);
+ for (TaskMetricRepo taskMetricRepo : taskMetrics.values()) {
+ taskMetricRepo.degister(metricFilter);
+ }
+ registry.removeMatching(metricFilter);
+ }
+
private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
Map<String, T> ret = new HashMap<>();
Map<String, T> taskMetrics = taskIdMetrics.getOrDefault(taskId, Collections.emptyMap());
@@ -432,4 +450,31 @@ public class StormMetricRegistry implements MetricRegistryProvider {
}
}
}
+
+ private static class RemoveMetricFilter implements MetricFilter {
+ private Set<Metric> metrics = new HashSet<>();
+
+ RemoveMetricFilter(Set<Metric> toRemove) {
+ this.metrics.addAll(toRemove);
+ for (Metric metric : toRemove) {
+ // RateCounters are gauges, but also have internal Counters that should also be removed
+ if (metric instanceof RateCounter) {
+ RateCounter rateCounter = (RateCounter) metric;
+ this.metrics.add(rateCounter.getCounter());
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+ *
+ * @param name the metric's name
+ * @param metric the metric
+ * @return {@code true} if the metric matches the filter
+ */
+ @Override
+ public boolean matches(String name, Metric metric) {
+ return this.metrics.contains(metric);
+ }
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
index e88b64f..785271c 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
@@ -22,16 +22,17 @@ import com.codahale.metrics.Timer;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Metric repository to allow reporting of task-specific metrics.
*/
public class TaskMetricRepo {
- private SortedMap<String, Gauge> gauges = new TreeMap<>();
- private SortedMap<String, Counter> counters = new TreeMap<>();
- private SortedMap<String, Histogram> histograms = new TreeMap<>();
- private SortedMap<String, Meter> meters = new TreeMap<>();
- private SortedMap<String, Timer> timers = new TreeMap<>();
+ private ConcurrentHashMap<String, Gauge> gauges = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Histogram> histograms = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Meter> meters = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>();
public void addCounter(String name, Counter counter) {
counters.put(name, counter);
@@ -54,41 +55,49 @@ public class TaskMetricRepo {
}
public void report(ScheduledReporter reporter, MetricFilter filter) {
- if (filter != null) {
- SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
- SortedMap<String, Counter> filteredCounters = new TreeMap<>();
- SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
- SortedMap<String, Meter> filteredMeters = new TreeMap<>();
- SortedMap<String, Timer> filteredTimers = new TreeMap<>();
+ if (filter == null) {
+ filter = MetricFilter.ALL;
+ }
+
+ SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
+ SortedMap<String, Counter> filteredCounters = new TreeMap<>();
+ SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
+ SortedMap<String, Meter> filteredMeters = new TreeMap<>();
+ SortedMap<String, Timer> filteredTimers = new TreeMap<>();
- for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- filteredGauges.put(entry.getKey(), entry.getValue());
- }
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredGauges.put(entry.getKey(), entry.getValue());
}
- for (Map.Entry<String, Counter> entry : counters.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- filteredCounters.put(entry.getKey(), entry.getValue());
- }
+ }
+ for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredCounters.put(entry.getKey(), entry.getValue());
}
- for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- filteredHistograms.put(entry.getKey(), entry.getValue());
- }
+ }
+ for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredHistograms.put(entry.getKey(), entry.getValue());
}
- for (Map.Entry<String, Meter> entry : meters.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- filteredMeters.put(entry.getKey(), entry.getValue());
- }
+ }
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredMeters.put(entry.getKey(), entry.getValue());
}
- for (Map.Entry<String, Timer> entry : timers.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- filteredTimers.put(entry.getKey(), entry.getValue());
- }
+ }
+ for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ filteredTimers.put(entry.getKey(), entry.getValue());
}
- reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
- } else {
- reporter.report(gauges, counters, histograms, meters, timers);
}
+ reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
+ }
+
+ void degister(MetricFilter metricFilter) {
+ gauges.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+ counters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+ histograms.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+ meters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+ timers.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
}
}
\ No newline at end of file
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 19f016f..3336737 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -110,7 +110,7 @@ public class NettyTest {
private void doTestBasic(Map<String, Object> stormConf) throws Exception {
LOG.info("1. Should send and receive a basic message");
String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -171,7 +171,7 @@ public class NettyTest {
private void doTestLoad(Map<String, Object> stormConf) throws Exception {
LOG.info("2 test load");
String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -225,7 +225,7 @@ public class NettyTest {
private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception {
LOG.info("3 Should send and receive a large message");
String reqMessage = StringUtils.repeat("c", 2_048_000);
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -264,7 +264,7 @@ public class NettyTest {
private void doTestServerDelayed(Map<String, Object> stormConf) throws Exception {
LOG.info("4. test server delayed");
String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
int port = Utils.getAvailablePort(6700);
@@ -315,7 +315,7 @@ public class NettyTest {
LOG.info("Should send and receive many messages (testing with " + numMessages + " messages)");
ArrayList<TaskMessage> responses = new ArrayList<>();
AtomicInteger received = new AtomicInteger();
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> {
responses.add(message);
@@ -362,7 +362,7 @@ public class NettyTest {
private void doTestServerAlwaysReconnects(Map<String, Object> stormConf) throws Exception {
LOG.info("6. test server always reconnects");
String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
int port = Utils.getAvailablePort(6700);
@@ -396,7 +396,7 @@ public class NettyTest {
private void connectToFixedPort(Map<String, Object> stormConf, int port) throws Exception {
LOG.info("7. Should be able to rebind to a port quickly");
String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
- IContext context = TransportFactory.makeContext(stormConf);
+ IContext context = TransportFactory.makeContext(stormConf, null);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);