You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/01/19 17:07:56 UTC

[storm] branch master updated: STORM-3682 Upgrade netty client metrics to use V2 API (#3371)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cc4797  STORM-3682 Upgrade netty client metrics to use V2 API (#3371)
3cc4797 is described below

commit 3cc4797e56cf30dc04c8c8dcf133534522cc87eb
Author: agresch <ag...@gmail.com>
AuthorDate: Tue Jan 19 11:06:56 2021 -0600

    STORM-3682 Upgrade netty client metrics to use V2 API (#3371)
    
    * STORM-3682 Upgrade netty client metrics to use V2 API
---
 docs/Metrics.md                                    | 20 +-----
 .../storm/daemon/metrics/BuiltinMetricsUtil.java   | 23 -------
 .../apache/storm/daemon/worker/WorkerState.java    |  3 +-
 .../apache/storm/executor/bolt/BoltExecutor.java   |  2 -
 .../jvm/org/apache/storm/messaging/IContext.java   | 12 ++++
 .../apache/storm/messaging/TransportFactory.java   |  6 +-
 .../org/apache/storm/messaging/netty/Client.java   | 78 ++++++++++++++++------
 .../org/apache/storm/messaging/netty/Context.java  | 10 ++-
 .../jvm/org/apache/storm/metrics2/RateCounter.java |  4 ++
 .../apache/storm/metrics2/StormMetricRegistry.java | 45 +++++++++++++
 .../org/apache/storm/metrics2/TaskMetricRepo.java  | 77 +++++++++++----------
 .../apache/storm/messaging/netty/NettyTest.java    | 14 ++--
 12 files changed, 188 insertions(+), 106 deletions(-)

diff --git a/docs/Metrics.md b/docs/Metrics.md
index 561f2f0..9d21f50 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -268,25 +268,11 @@ Be aware that the `__system` bolt is an actual bolt so regular bolt metrics desc
 
 ##### Send (Netty Client)
 
-The `__send-iconnection` metric holds information about all of the clients for this worker.  It is of the form
+The `__send-iconnection` metrics report information about all of the clients for this worker.  They are named __send-iconnection-METRIC_TYPE-HOST:PORT for a given Client that is
+connected to a worker with the given host/port.
 
-```
-{
-    NodeInfo(node:7decee4b-c314-41f4-b362-fd1358c985b3-127.0.01, port:[6701]): {
-        "reconnects": 0,
-        "src": "/127.0.0.1:49951",
-        "pending": 0,
-        "dest": "localhost/127.0.0.1:6701",
-        "sent": 420779,
-        "lostOnSend": 0
-    }
-}
-```
-
-The value is a map where the key is a NodeInfo class for the downstream worker it is sending messages to.  This is the SupervisorId + port.  The value is another map with the fields
+The metric types reported for each client are:
 
- * `src`  What host/port this client has used to connect to the receiving worker.
- * `dest` What host/port this client has connected to.
  * `reconnects` the number of reconnections that have happened.
  * `pending` the number of messages that have not been sent.  (This corresponds to messages, not tuples)
  * `sent` the number of messages that have been sent.  (This is messages not tuples)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 72d660f..828d7ea 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -12,16 +12,12 @@
 
 package org.apache.storm.daemon.metrics;
 
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.Config;
-import org.apache.storm.generated.NodeInfo;
-import org.apache.storm.messaging.IConnection;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.api.StateMetric;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.utils.JCQueue;
 
 public class BuiltinMetricsUtil {
     public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
@@ -30,25 +26,6 @@ public class BuiltinMetricsUtil {
         }
     }
 
-    public static void registerIconnectionClientMetrics(final Map<NodeInfo, IConnection> nodePortToSocket, Map<String, Object> topoConf,
-                                                        TopologyContext context) {
-        IMetric metric = new IMetric() {
-            @Override
-            public Object getValueAndReset() {
-                Map<Object, Object> ret = new HashMap<>();
-                for (Map.Entry<NodeInfo, IConnection> entry : nodePortToSocket.entrySet()) {
-                    NodeInfo nodePort = entry.getKey();
-                    IConnection connection = entry.getValue();
-                    if (connection instanceof IStatefulObject) {
-                        ret.put(nodePort, ((IStatefulObject) connection).getState());
-                    }
-                }
-                return ret;
-            }
-        };
-        registerMetric("__send-iconnection", metric, topoConf, context);
-    }
-
     public static void registerMetric(String name, IMetric metric, Map<String, Object> topoConf, TopologyContext context) {
         int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
         context.registerMetric(name, metric, bucketSize);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index ab4a8b6..f7aa451 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -174,7 +174,8 @@ public class WorkerState {
         this.credentialsAtom = new AtomicReference(initialCredentials);
         this.conf = conf;
         this.supervisorIfaceSupplier = supervisorIfaceSupplier;
-        this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
+        this.mqContext = (null != mqContext) ? mqContext :
+                TransportFactory.makeContext(topologyConf, metricRegistry);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
         this.port = port;
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index ddd830d..293d756 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -114,8 +114,6 @@ public class BoltExecutor extends Executor {
                 ((ICredentialsListener) boltObject).setCredentials(credentials);
             }
             if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
-                Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
-                BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
                 BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
 
                 // add any autocredential expiry metrics from the worker
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index ac56a8a..8b1183b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import org.apache.storm.metrics2.StormMetricRegistry;
 
 /**
  * This interface needs to be implemented for messaging plugin.
@@ -30,9 +31,20 @@ public interface IContext {
      *
      * @param topoConf storm configuration
      */
+    @Deprecated
     void prepare(Map<String, Object> topoConf);
 
     /**
+     * This method is invoked at the startup of messaging plugin.
+     *
+     * @param topoConf storm configuration
+     * @param metricRegistry storm metric registry
+     */
+    default void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
+        prepare(topoConf);
+    }
+
+    /**
      * This method is invoked when a worker is unloading a messaging plugin.
      */
     void term();
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
index cc48eca..fde5344 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
@@ -15,13 +15,14 @@ package org.apache.storm.messaging;
 import java.lang.reflect.Method;
 import java.util.Map;
 import org.apache.storm.Config;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TransportFactory {
     public static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
 
-    public static IContext makeContext(Map<String, Object> topoConf) {
+    public static IContext makeContext(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
 
         //get factory class name
         String transportPluginClassName = (String) topoConf.get(Config.STORM_MESSAGING_TRANSPORT);
@@ -37,9 +38,10 @@ public class TransportFactory {
                 //case 1: plugin is a IContext class
                 transport = (IContext) obj;
                 //initialize with storm configuration
-                transport.prepare(topoConf);
+                transport.prepare(topoConf, metricRegistry);
             } else {
                 //case 2: Non-IContext plugin must have a makeContext(topoConf) method that returns IContext object
+                // StormMetricRegistry is ignored if IContext is created this way
                 Method method = klass.getMethod("makeContext", Map.class);
                 LOG.debug("object:" + obj + " method:" + method);
                 transport = (IContext) method.invoke(obj, topoConf);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 5819924..9f54fbd 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -14,13 +14,17 @@ package org.apache.storm.messaging.netty;
 
 import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,10 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.grouping.Load;
 import org.apache.storm.messaging.ConnectionWithStatus;
 import org.apache.storm.messaging.TaskMessage;
-import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.IWaitStrategy.WaitSituation;
 import org.apache.storm.policy.WaitStrategyProgressive;
@@ -63,7 +68,7 @@ import org.slf4j.LoggerFactory;
  * asynchronously. Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
  * destination is currently unavailable.
  */
-public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
+public class Client extends ConnectionWithStatus implements ISaslClient {
     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
     private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
     /**
@@ -119,10 +124,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      * This flag is set to true if and only if a client instance is being closed.
      */
     private volatile boolean closing = false;
+    StormMetricRegistry metricRegistry;
+    private Set<Metric> metrics = new HashSet<>();
 
     Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
         EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
-           int port) {
+           int port, StormMetricRegistry metricRegistry) {
         this.topoConf = topoConf;
         closing = false;
         this.scheduler = scheduler;
@@ -163,6 +170,50 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             waitStrategy = ReflectionUtils.newInstance(clazz);
         }
         waitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT);
+        this.metricRegistry = metricRegistry;
+
+        // it's possible to be passed a null metric registry if users are using their own IContext implementation.
+        if (this.metricRegistry != null) {
+            Gauge<Integer> reconnects = new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                    return totalConnectionAttempts.get();
+                }
+            };
+            metricRegistry.gauge("__send-iconnection-reconnects-" + host + ":" + port, reconnects,
+                    Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+            metrics.add(reconnects);
+
+            Gauge<Integer> sent = new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                    return messagesSent.get();
+                }
+            };
+            metricRegistry.gauge("__send-iconnection-sent-" + host + ":" + port, sent,
+                    Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+            metrics.add(sent);
+
+            Gauge<Long> pending = new Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    return pendingMessages.get();
+                }
+            };
+            metricRegistry.gauge("__send-iconnection-pending-" + host + ":" + port, pending,
+                    Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+            metrics.add(pending);
+
+            Gauge<Integer> lostOnSend = new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                    return messagesLost.get();
+                }
+            };
+            metricRegistry.gauge("__send-iconnection-lostOnSend-" + host + ":" + port, lostOnSend,
+                    Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
+            metrics.add(lostOnSend);
+        }
     }
 
     /**
@@ -415,6 +466,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             closing = true;
             waitForPendingMessagesToBeSent();
             closeChannel();
+
+            // stop tracking metrics for this client
+            if (this.metricRegistry != null) {
+                this.metricRegistry.deregister(this.metrics);
+            }
         }
     }
 
@@ -467,22 +523,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         return ret;
     }
 
-    @Override
-    public Object getState() {
-        LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
-        HashMap<String, Object> ret = new HashMap<String, Object>();
-        ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
-        ret.put("sent", messagesSent.getAndSet(0));
-        ret.put("pending", pendingMessages.get());
-        ret.put("lostOnSend", messagesLost.getAndSet(0));
-        ret.put("dest", dstAddress.toString());
-        String src = srcAddressName();
-        if (src != null) {
-            ret.put("src", src);
-        }
-        return ret;
-    }
-
     public Map<String, Object> getConfig() {
         return topoConf;
     }
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 03feaf8..a1384cb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -22,6 +22,7 @@ import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.IContext;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
 import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
@@ -32,12 +33,18 @@ public class Context implements IContext {
     private List<Server> serverConnections;
     private EventLoopGroup workerEventLoopGroup;
     private HashedWheelTimer clientScheduleService;
+    private StormMetricRegistry metricRegistry = null;
 
     /**
      * initialization per Storm configuration.
      */
     @Override
     public void prepare(Map<String, Object> topoConf) {
+        prepare(topoConf, null);
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) {
         this.topoConf = topoConf;
         serverConnections = new ArrayList<>();
 
@@ -49,6 +56,7 @@ public class Context implements IContext {
         this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
 
         clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
+        this.metricRegistry = metricRegistry;
     }
 
     /**
@@ -67,7 +75,7 @@ public class Context implements IContext {
     @Override
     public IConnection connect(String stormId, String host, int port, AtomicBoolean[] remoteBpStatus) {
         return new Client(topoConf, remoteBpStatus, workerEventLoopGroup,
-                                        clientScheduleService, host, port);
+                                        clientScheduleService, host, port, metricRegistry);
     }
 
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
index 77b6720..6f5c32c 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -59,4 +59,8 @@ public class RateCounter implements Gauge<Double> {
         values[time] = counter.getCount();
         currentRate =  ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
     }
+
+    Counter getCounter() {
+        return counter;
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 6863503..3ea7a69 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -18,6 +18,7 @@ import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
@@ -25,6 +26,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +86,7 @@ public class StormMetricRegistry implements MetricRegistryProvider {
         return gauge;
     }
 
+    @Deprecated
     public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
         gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
@@ -91,6 +94,13 @@ public class StormMetricRegistry implements MetricRegistryProvider {
         return gauge;
     }
 
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String componentId, Integer taskId) {
+        MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+        gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
+        saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
+        return gauge;
+    }
+
     public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId,
                               String streamId, Integer taskId, Integer port) {
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port);
@@ -235,6 +245,14 @@ public class StormMetricRegistry implements MetricRegistryProvider {
         return histogram;
     }
 
+    public void deregister(Set<Metric> toRemove) {
+        MetricFilter metricFilter = new RemoveMetricFilter(toRemove);
+        for (TaskMetricRepo taskMetricRepo : taskMetrics.values()) {
+            taskMetricRepo.degister(metricFilter);
+        }
+        registry.removeMatching(metricFilter);
+    }
+
     private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
         Map<String, T> ret = new HashMap<>();
         Map<String, T> taskMetrics = taskIdMetrics.getOrDefault(taskId, Collections.emptyMap());
@@ -432,4 +450,31 @@ public class StormMetricRegistry implements MetricRegistryProvider {
             }
         }
     }
+
+    private static class RemoveMetricFilter implements MetricFilter {
+        private Set<Metric> metrics = new HashSet<>();
+
+        RemoveMetricFilter(Set<Metric> toRemove) {
+            this.metrics.addAll(toRemove);
+            for (Metric metric : toRemove) {
+                // RateCounters are gauges, but also have internal Counters that should also be removed
+                if (metric instanceof RateCounter) {
+                    RateCounter rateCounter = (RateCounter) metric;
+                    this.metrics.add(rateCounter.getCounter());
+                }
+            }
+        }
+
+        /**
+         * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+         *
+         * @param name   the metric's name
+         * @param metric the metric
+         * @return {@code true} if the metric matches the filter
+         */
+        @Override
+        public boolean matches(String name, Metric metric) {
+            return this.metrics.contains(metric);
+        }
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
index e88b64f..785271c 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
@@ -22,16 +22,17 @@ import com.codahale.metrics.Timer;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Metric repository to allow reporting of task-specific metrics.
  */
 public class TaskMetricRepo {
-    private SortedMap<String, Gauge> gauges = new TreeMap<>();
-    private SortedMap<String, Counter> counters = new TreeMap<>();
-    private SortedMap<String, Histogram> histograms = new TreeMap<>();
-    private SortedMap<String, Meter> meters = new TreeMap<>();
-    private SortedMap<String, Timer> timers = new TreeMap<>();
+    private ConcurrentHashMap<String, Gauge> gauges = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Histogram> histograms = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Meter> meters = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>();
 
     public void addCounter(String name, Counter counter) {
         counters.put(name, counter);
@@ -54,41 +55,49 @@ public class TaskMetricRepo {
     }
 
     public void report(ScheduledReporter reporter, MetricFilter filter) {
-        if (filter != null) {
-            SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
-            SortedMap<String, Counter> filteredCounters = new TreeMap<>();
-            SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
-            SortedMap<String, Meter> filteredMeters = new TreeMap<>();
-            SortedMap<String, Timer> filteredTimers = new TreeMap<>();
+        if (filter == null) {
+            filter = MetricFilter.ALL;
+        }
+
+        SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
+        SortedMap<String, Counter> filteredCounters = new TreeMap<>();
+        SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
+        SortedMap<String, Meter> filteredMeters = new TreeMap<>();
+        SortedMap<String, Timer> filteredTimers = new TreeMap<>();
 
-            for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
-                if (filter.matches(entry.getKey(), entry.getValue())) {
-                    filteredGauges.put(entry.getKey(), entry.getValue());
-                }
+        for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                filteredGauges.put(entry.getKey(), entry.getValue());
             }
-            for (Map.Entry<String, Counter> entry : counters.entrySet()) {
-                if (filter.matches(entry.getKey(), entry.getValue())) {
-                    filteredCounters.put(entry.getKey(), entry.getValue());
-                }
+        }
+        for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                filteredCounters.put(entry.getKey(), entry.getValue());
             }
-            for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
-                if (filter.matches(entry.getKey(), entry.getValue())) {
-                    filteredHistograms.put(entry.getKey(), entry.getValue());
-                }
+        }
+        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                filteredHistograms.put(entry.getKey(), entry.getValue());
             }
-            for (Map.Entry<String, Meter> entry : meters.entrySet()) {
-                if (filter.matches(entry.getKey(), entry.getValue())) {
-                    filteredMeters.put(entry.getKey(), entry.getValue());
-                }
+        }
+        for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                filteredMeters.put(entry.getKey(), entry.getValue());
             }
-            for (Map.Entry<String, Timer> entry : timers.entrySet()) {
-                if (filter.matches(entry.getKey(), entry.getValue())) {
-                    filteredTimers.put(entry.getKey(), entry.getValue());
-                }
+        }
+        for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                filteredTimers.put(entry.getKey(), entry.getValue());
             }
-            reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
-        } else {
-            reporter.report(gauges, counters, histograms, meters, timers);
         }
+        reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
+    }
+
+    void degister(MetricFilter metricFilter) {
+        gauges.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+        counters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+        histograms.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+        meters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
+        timers.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue()));
     }
 }
\ No newline at end of file
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 19f016f..3336737 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -110,7 +110,7 @@ public class NettyTest {
     private void doTestBasic(Map<String, Object> stormConf) throws Exception {
         LOG.info("1. Should send and receive a basic message");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -171,7 +171,7 @@ public class NettyTest {
     private void doTestLoad(Map<String, Object> stormConf) throws Exception {
         LOG.info("2 test load");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -225,7 +225,7 @@ public class NettyTest {
     private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception {
         LOG.info("3 Should send and receive a large message");
         String reqMessage = StringUtils.repeat("c", 2_048_000);
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
@@ -264,7 +264,7 @@ public class NettyTest {
     private void doTestServerDelayed(Map<String, Object> stormConf) throws Exception {
         LOG.info("4. test server delayed");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             int port = Utils.getAvailablePort(6700);
@@ -315,7 +315,7 @@ public class NettyTest {
         LOG.info("Should send and receive many messages (testing with " + numMessages + " messages)");
         ArrayList<TaskMessage> responses = new ArrayList<>();
         AtomicInteger received = new AtomicInteger();
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> {
                     responses.add(message);
@@ -362,7 +362,7 @@ public class NettyTest {
     private void doTestServerAlwaysReconnects(Map<String, Object> stormConf) throws Exception {
         LOG.info("6. test server always reconnects");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             int port = Utils.getAvailablePort(6700);
@@ -396,7 +396,7 @@ public class NettyTest {
     private void connectToFixedPort(Map<String, Object> stormConf, int port) throws Exception {
         LOG.info("7. Should be able to rebind to a port quickly");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
-        IContext context = TransportFactory.makeContext(stormConf);
+        IContext context = TransportFactory.makeContext(stormConf, null);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
             try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);