You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 19:54:04 UTC

[3/6] storm git commit: STORM-3197: Make StormMetricsRegistry non-static

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
index ee7f621..2ac1c20 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -55,12 +55,14 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
      * Create metric store instance using the configurations provided via the config map.
      *
      * @param config Storm config map
+     * @param metricsRegistry The Nimbus daemon metrics registry
      * @throws MetricException on preparation error
      */
-    public void prepare(Map<String, Object> config) throws MetricException {
+    @Override
+    public void prepare(Map<String, Object> config, StormMetricsRegistry metricsRegistry) throws MetricException {
         validateConfig(config);
 
-        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+        this.failureMeter = metricsRegistry.registerMeter("RocksDB:metric-failures");
 
         RocksDB.loadLibrary();
         boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
@@ -87,7 +89,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
         if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
             deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
         }
-        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
+        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter, metricsRegistry);
 
         // create thread to process insertion of all metrics
         metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index 3783fdb..edd7444 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -48,12 +48,14 @@ import org.slf4j.LoggerFactory;
  * A callback function when nimbus gains leadership.
  */
 public class LeaderListenerCallback {
-    private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership");
-    private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership");
     private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
     private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
     private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
     private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
+    
+    private final Meter numGainedLeader;
+    private final Meter numLostLeader;
+    
     private final BlobStore blobStore;
     private final TopoCache tc;
     private final IStormClusterState clusterState;
@@ -73,7 +75,7 @@ public class LeaderListenerCallback {
      * @param acls zookeeper acls
      */
     public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
-                                  TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+                                  TopoCache tc, IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
         this.blobStore = blobStore;
         this.tc = tc;
         this.clusterState = clusterState;
@@ -81,6 +83,8 @@ public class LeaderListenerCallback {
         this.leaderLatch = leaderLatch;
         this.conf = conf;
         this.acls = acls;
+        this.numGainedLeader = metricsRegistry.registerMeter("nimbus:num-gained-leadership");
+        this.numLostLeader = metricsRegistry.registerMeter("nimbus:num-lost-leadership");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
index 2eea634..5144151 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.pacemaker;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import java.util.ArrayList;
@@ -27,35 +28,41 @@ import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class Pacemaker implements IServerMessageHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class);
-    private final static Meter meterSendPulseCount = StormMetricsRegistry.registerMeter("pacemaker:send-pulse-count");
-    private final static Meter meterTotalReceivedSize = StormMetricsRegistry.registerMeter("pacemaker:total-receive-size");
-    private final static Meter meterGetPulseCount = StormMetricsRegistry.registerMeter("pacemaker:get-pulse=count");
-    private final static Meter meterTotalSentSize = StormMetricsRegistry.registerMeter("pacemaker:total-sent-size");
-    private final static Histogram histogramHeartbeatSize = StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size");
+    private final Meter meterSendPulseCount;
+    private final Meter meterTotalReceivedSize;
+    private final Meter meterGetPulseCount;
+    private final Meter meterTotalSentSize;
+    private final Histogram histogramHeartbeatSize;
     private final Map<String, byte[]> heartbeats;
     private final Map<String, Object> conf;
 
-
-    public Pacemaker(Map<String, Object> conf) {
+    public Pacemaker(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         heartbeats = new ConcurrentHashMap<>();
         this.conf = conf;
-        StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
-        StormMetricsRegistry.startMetricsReporters(conf);
+        this.meterSendPulseCount = metricsRegistry.registerMeter("pacemaker:send-pulse-count");
+        this.meterTotalReceivedSize = metricsRegistry.registerMeter("pacemaker:total-receive-size");
+        this.meterGetPulseCount = metricsRegistry.registerMeter("pacemaker:get-pulse=count");
+        this.meterTotalSentSize = metricsRegistry.registerMeter("pacemaker:total-sent-size");
+        this.histogramHeartbeatSize = metricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir());
+        metricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
     }
 
     public static void main(String[] args) {
         SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
         Map<String, Object> conf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readStormConfig());
-        final Pacemaker serverHandler = new Pacemaker(conf);
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        final Pacemaker serverHandler = new Pacemaker(conf, metricsRegistry);
         serverHandler.launchServer();
+        metricsRegistry.startMetricsReporters(conf);
+        Utils.addShutdownHookWithForceKillIn1Sec(metricsRegistry::stopMetricsReporters);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index e8d771d..b8bc69a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -39,6 +39,7 @@ import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
@@ -80,17 +81,19 @@ public class Cluster implements ISchedulingState {
     private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+    private final ResourceMetrics resourceMetrics;
     private SchedulerAssignmentImpl assignment;
     private Set<String> blackListedHosts = new HashSet<>();
     private INimbus inimbus;
 
     public Cluster(
         INimbus nimbus,
+        ResourceMetrics resourceMetrics,
         Map<String, SupervisorDetails> supervisors,
         Map<String, ? extends SchedulerAssignment> map,
         Topologies topologies,
         Map<String, Object> conf) {
-        this(nimbus, supervisors, map, topologies, conf, null, null, null);
+        this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null);
     }
 
     /**
@@ -99,6 +102,7 @@ public class Cluster implements ISchedulingState {
     public Cluster(Cluster src) {
         this(
             src.inimbus,
+            src.resourceMetrics,
             src.supervisors,
             src.assignments,
             src.topologies,
@@ -118,6 +122,7 @@ public class Cluster implements ISchedulingState {
     public Cluster(Cluster src, Topologies topologies) {
         this(
             src.inimbus,
+            src.resourceMetrics,
             src.supervisors,
             src.assignments,
             topologies,
@@ -129,6 +134,7 @@ public class Cluster implements ISchedulingState {
 
     private Cluster(
         INimbus nimbus,
+        ResourceMetrics resourceMetrics,
         Map<String, SupervisorDetails> supervisors,
         Map<String, ? extends SchedulerAssignment> assignments,
         Topologies topologies,
@@ -137,6 +143,7 @@ public class Cluster implements ISchedulingState {
         Set<String> blackListedHosts,
         Map<String, List<String>> networkTopography) {
         this.inimbus = nimbus;
+        this.resourceMetrics = resourceMetrics;
         this.supervisors.putAll(supervisors);
         this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
         this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
@@ -466,7 +473,7 @@ public class Cluster implements ISchedulingState {
         for (SchedulerAssignment assignment: assignments.values()) {
             for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
                 if (sd.getId().equals(entry.getKey().getNodeId())) {
-                   ret.remove(entry.getValue());
+                   ret.remove(entry.getValue(), getResourceMetrics());
                 }
             }
         }
@@ -812,7 +819,7 @@ public class Cluster implements ISchedulingState {
         for (SupervisorDetails sup : supervisors.values()) {
             if (!isBlackListed(sup.getId()) && !blacklistedSupervisorIds.contains(sup.getId())) {
                 available.add(sup.getTotalResources());
-                available.remove(getAllScheduledResourcesForNode(sup.getId()));
+                available.remove(getAllScheduledResourcesForNode(sup.getId()), getResourceMetrics());
             }
         }
         return available;
@@ -971,6 +978,10 @@ public class Cluster implements ISchedulingState {
         nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
     }
 
+    public ResourceMetrics getResourceMetrics() {
+        return resourceMetrics;
+    }
+
     @Override
     public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
         return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 4ac1057..2700871 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -17,6 +17,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 5d781aa..757f256 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.daemon.Acker;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.ComponentType;

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 5034f42..1e5b04d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -42,6 +42,7 @@ public class BlacklistScheduler implements IScheduler {
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
     private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
     private final IScheduler underlyingScheduler;
+    private final StormMetricsRegistry metricsRegistry;
     protected int toleranceTime;
     protected int toleranceCount;
     protected int resumeTime;
@@ -55,8 +56,9 @@ public class BlacklistScheduler implements IScheduler {
     protected Set<String> blacklistHost;
     private Map<String, Object> conf;
 
-    public BlacklistScheduler(IScheduler underlyingScheduler) {
+    public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
         this.underlyingScheduler = underlyingScheduler;
+        this.metricsRegistry = metricsRegistry;
     }
 
     @Override
@@ -89,7 +91,7 @@ public class BlacklistScheduler implements IScheduler {
         blacklistHost = new HashSet<>();
 
         //nimbus:num-blacklisted-supervisor + non-blacklisted supervisor = nimbus:num-supervisors
-        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistHost.size());
+        metricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistHost.size());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
index f7abc04..6c823ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -71,7 +71,7 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
             //Now we need to free up some resources...
             Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
             NormalizedResourceOffer shortage = new NormalizedResourceOffer(needed);
-            shortage.remove(available);
+            shortage.remove(available, cluster.getResourceMetrics());
             int shortageSlots = neededSlots - availableSlots;
             LOG.debug("Need {} and {} slots.", needed, neededSlots);
             LOG.debug("Available {} and {} slots.", available, availableSlots);
@@ -86,7 +86,7 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
                         NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd);
                         int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
                         readyToRemove.add(supervisor);
-                        shortage.remove(sdAvailable);
+                        shortage.remove(sdAvailable, cluster.getResourceMetrics());
                         shortageSlots -= sdAvailableSlots;
                         LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor,
                             sdAvailable, sdAvailableSlots, shortage, shortageSlots);

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5975780..89c0460 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -445,7 +445,7 @@ public class RAS_Node {
     public NormalizedResourceOffer getTotalAvailableResources() {
         if (sup != null) {
             NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources());
-            if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()))) {
+            if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()), cluster.getResourceMetrics())) {
                 if (!loggedUnderageUsage) {
                     LOG.error("Resources on {} became negative and was clamped to 0 {}.", hostname, availableResources);
                     loggedUnderageUsage = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 7ef5f74..366995e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -39,8 +39,7 @@ public class ResourceUtils {
         return null;
     }
 
-    public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology,
-                                                                           Map<String, Object> topologyConf) {
+    public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
         Map<String, NormalizedResourceRequest> boltResources = new HashMap<>();
         if (topology.get_bolts() != null) {
             for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
@@ -55,8 +54,8 @@ public class ResourceUtils {
         return boltResources;
     }
 
-    public static NormalizedResourceRequest getSpoutResources(StormTopology topology, Map<String, Object> topologyConf,
-                                                              String componentId) {
+    public static NormalizedResourceRequest getSpoutResources(StormTopology topology,
+        Map<String, Object> topologyConf, String componentId) {
         if (topology.get_spouts() != null) {
             SpoutSpec spout = topology.get_spouts().get(componentId);
             return new NormalizedResourceRequest(spout.get_common(), topologyConf, componentId);
@@ -65,7 +64,7 @@ public class ResourceUtils {
     }
 
     public static Map<String, NormalizedResourceRequest> getSpoutsResources(StormTopology topology,
-                                                                            Map<String, Object> topologyConf) {
+        Map<String, Object> topologyConf) {
         Map<String, NormalizedResourceRequest> spoutResources = new HashMap<>();
         if (topology.get_spouts() != null) {
             for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index c680be7..707c893 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -84,14 +84,15 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     /**
      * Remove the resources in other from this.
      * @param other the resources to be removed.
+     * @param resourceMetrics The resource related metrics
      * @return true if one or more resources in other were larger than available resources in this, else false.
      */
-    public boolean remove(NormalizedResourcesWithMemory other) {
-        boolean negativeResources = normalizedResources.remove(other.getNormalizedResources());
+    public boolean remove(NormalizedResourcesWithMemory other, ResourceMetrics resourceMetrics) {
+        boolean negativeResources = normalizedResources.remove(other.getNormalizedResources(), resourceMetrics);
         totalMemoryMb -= other.getTotalMemoryMb();
         if (totalMemoryMb < 0.0) {
             negativeResources = true;
-            NormalizedResources.numNegativeResourceEvents.mark();
+            resourceMetrics.getNegativeResourceEventsMeter().mark();
             totalMemoryMb = 0.0;
         }
         return negativeResources;
@@ -100,14 +101,15 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     /**
      * Remove the resources in other from this.
      * @param other the resources to be removed.
+     * @param resourceMetrics The resource related metrics
      * @return true if one or more resources in other were larger than available resources in this, else false.
      */
-    public boolean remove(WorkerResources other) {
+    public boolean remove(WorkerResources other, ResourceMetrics resourceMetrics) {
         boolean negativeResources = normalizedResources.remove(other);
         totalMemoryMb -= (other.get_mem_off_heap() + other.get_mem_on_heap());
         if (totalMemoryMb < 0.0) {
             negativeResources = true;
-            NormalizedResources.numNegativeResourceEvents.mark();
+            resourceMetrics.getNegativeResourceEventsMeter().mark();
             totalMemoryMb = 0.0;
         }
         return negativeResources;

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 14c6846..478a8be 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -43,7 +43,7 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
     private double offHeap;
 
     private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-                                      Map<String, Double> defaultResources) {
+        Map<String, Double> defaultResources) {
         if (resources == null && defaultResources == null) {
             onHeap = 0.0;
             offHeap = 0.0;

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 1e6af6f..6419406 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -35,16 +35,13 @@ import org.slf4j.LoggerFactory;
 public class NormalizedResources {
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-    public static final Meter numNegativeResourceEvents = StormMetricsRegistry.registerMeter("nimbus:num-negative-resource-events");
-
-
     public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
     private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
 
     static {
         resetResourceNames();
     }
-
+    
     private double cpu;
     private double[] otherResources;
 
@@ -130,14 +127,15 @@ public class NormalizedResources {
      * Remove the other resources from this. This is the same as subtracting the resources in other from this.
      *
      * @param other the resources we want removed.
+     * @param resourceMetrics The resource related metrics
      * @return true if the resources would have gone negative, but were clamped to 0.
      */
-    public boolean remove(NormalizedResources other) {
+    public boolean remove(NormalizedResources other, ResourceMetrics resourceMetrics) {
         boolean ret = false;
         this.cpu -= other.cpu;
         if (cpu < 0.0) {
             ret = true;
-            numNegativeResourceEvents.mark();
+            resourceMetrics.getNegativeResourceEventsMeter().mark();
             cpu = 0.0;
         }
         int otherLength = other.otherResources.length;
@@ -146,7 +144,7 @@ public class NormalizedResources {
             otherResources[i] -= other.otherResources[i];
             if (otherResources[i] < 0.0) {
                 ret = true;
-                numNegativeResourceEvents.mark();
+                resourceMetrics.getNegativeResourceEventsMeter().mark();
                 otherResources[i]  = 0.0;
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
new file mode 100644
index 0000000..851499b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.codahale.metrics.Meter;
+import org.apache.storm.metric.StormMetricsRegistry;
+
+public class ResourceMetrics {
+
+    private final Meter numNegativeResourceEvents;
+    
+    public ResourceMetrics(StormMetricsRegistry metricsRegistry) {
+        numNegativeResourceEvents = metricsRegistry.registerMeter("nimbus:num-negative-resource-events");
+    }
+
+    public Meter getNegativeResourceEventsMeter() {
+        return numNegativeResourceEvents;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 59b0f31..b5da8db 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -45,6 +45,7 @@ import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -632,12 +633,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      */
     static class AllResources {
         List<ObjectResources> objectResources = new LinkedList<>();
-        NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer();
-        NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer();
+        final NormalizedResourceOffer availableResourcesOverall;
+        final NormalizedResourceOffer totalResourcesOverall;
         String identifier;
 
         public AllResources(String identifier) {
             this.identifier = identifier;
+            this.availableResourcesOverall = new NormalizedResourceOffer();
+            this.totalResourcesOverall = new NormalizedResourceOffer();
         }
 
         public AllResources(AllResources other) {
@@ -666,12 +669,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      */
     static class ObjectResources {
         public final String id;
-        public NormalizedResourceOffer availableResources = new NormalizedResourceOffer();
-        public NormalizedResourceOffer totalResources = new NormalizedResourceOffer();
+        public NormalizedResourceOffer availableResources;
+        public NormalizedResourceOffer totalResources;
         public double effectiveResources = 0.0;
 
         public ObjectResources(String id) {
             this.id = id;
+            this.availableResources = new NormalizedResourceOffer();
+            this.totalResources = new NormalizedResourceOffer();
         }
 
         public ObjectResources(ObjectResources other) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 03fec1f..6dc33e8 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.LeaderListenerCallback;
 import org.apache.storm.nimbus.NimbusInfo;
@@ -45,10 +46,12 @@ public class LeaderElectorImp implements ILeaderElector {
     private final TopoCache tc;
     private final IStormClusterState clusterState;
     private final List<ACL> acls;
+    private final StormMetricsRegistry metricsRegistry;
 
     public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
                             AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
-                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+                            StormMetricsRegistry metricsRegistry) {
         this.conf = conf;
         this.servers = servers;
         this.zk = zk;
@@ -60,6 +63,7 @@ public class LeaderElectorImp implements ILeaderElector {
         this.tc = tc;
         this.clusterState = clusterState;
         this.acls = acls;
+        this.metricsRegistry = metricsRegistry;
     }
 
     @Override
@@ -72,7 +76,8 @@ public class LeaderElectorImp implements ILeaderElector {
         // if this latch is already closed, we need to create new instance.
         if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
             leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
-            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls);
+            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
+                metricsRegistry);
             leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
             LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index f30c246..5468573 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -31,6 +31,7 @@ import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.LeaderListenerCallback;
 import org.apache.storm.nimbus.NimbusInfo;
@@ -146,13 +147,14 @@ public class Zookeeper {
      * @throws UnknownHostException
      */
     public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore,
-                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws
-        UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls);
+                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+                                                 StormMetricsRegistry metricsRegistry) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls, metricsRegistry);
     }
 
     protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore,
-                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws
+                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+                                                 StormMetricsRegistry metricsRegistry) throws
         UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         String leaderLockPath = "/leader-lock";
@@ -160,9 +162,9 @@ public class Zookeeper {
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
             new AtomicReference<>(leaderLatchListenerImpl(
-                new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls)));
+                new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls, metricsRegistry)));
         return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
-                                    leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls);
+                                    leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls, metricsRegistry);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
index 5f7f8e9..dfd53be 100644
--- a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
@@ -20,6 +20,7 @@ import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBMessageData;
 import org.apache.storm.generated.HBPulse;
 import org.apache.storm.generated.HBServerMessageType;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.pacemaker.Pacemaker;
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
@@ -31,15 +32,16 @@ public class PacemakerTest {
     private HBMessage hbMessage;
     private int mid;
     private Random random;
+    private Pacemaker handler;
 
     @Before
     public void init() {
         random = new Random(100);
+        handler = new Pacemaker(new ConcurrentHashMap<>(), new StormMetricsRegistry());
     }
 
     @Test
     public void testServerCreatePath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.CREATE_PATH, HBMessageData.path("/testpath"));
         HBMessage response = handler.handleMessage(hbMessage, true);
         Assert.assertEquals(mid, response.get_message_id());
@@ -49,7 +51,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerExistsFalse() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.EXISTS, HBMessageData.path("/testpath"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
         HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -65,7 +66,6 @@ public class PacemakerTest {
     public void testServerExistsTrue() {
         String path = "/exists_path";
         String dataString = "pulse data";
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
         hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -87,7 +87,6 @@ public class PacemakerTest {
     public void testServerSendPulseGetPulse() throws UnsupportedEncodingException {
         String path = "/pulsepath";
         String dataString = "pulse data";
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
         hbPulse.set_details(dataString.getBytes("UTF-8"));
@@ -106,7 +105,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetAllPulseForPath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.GET_ALL_PULSE_FOR_PATH, HBMessageData.path("/testpath"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
         HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -120,7 +118,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetAllNodesForPath() throws UnsupportedEncodingException {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root-path/foo");
         makeNode(handler, "/some-root-path/bar");
         makeNode(handler, "/some-root-path/baz");
@@ -162,7 +159,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetPulse() throws UnsupportedEncodingException {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/GET_PULSE");
         messageWithRandId(HBServerMessageType.GET_PULSE, HBMessageData.path("/some-root/GET_PULSE"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
@@ -180,7 +176,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerDeletePath() throws UnsupportedEncodingException {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PATH/foo");
         makeNode(handler, "/some-root/DELETE_PATH/bar");
         makeNode(handler, "/some-root/DELETE_PATH/baz");
@@ -202,7 +197,6 @@ public class PacemakerTest {
 
     @Test
     public void testServerDeletePulseId() throws UnsupportedEncodingException {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PULSE_ID/foo");
         makeNode(handler, "/some-root/DELETE_PULSE_ID/bar");
         makeNode(handler, "/some-root/DELETE_PULSE_ID/baz");

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
index e4b586b..a0825d9 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
@@ -47,6 +47,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.storm.metric.StormMetricsRegistry;
+
 public class DRPCTest {
     private static final ExecutorService exec = Executors.newCachedThreadPool();
 
@@ -80,7 +82,7 @@ public class DRPCTest {
 
     @Test
     public void testGoodBlocking() throws Exception {
-        try (DRPC server = new DRPC(null, 100)) {
+        try (DRPC server = new DRPC(new StormMetricsRegistry(), null, 100)) {
             Future<String> found = exec.submit(() -> server.executeBlocking("testing", "test"));
             DRPCRequest request = getNextAvailableRequest(server, "testing");
             assertNotNull(request);
@@ -94,7 +96,7 @@ public class DRPCTest {
 
     @Test
     public void testFailedBlocking() throws Exception {
-        try (DRPC server = new DRPC(null, 100)) {
+        try (DRPC server = new DRPC(new StormMetricsRegistry(), null, 100)) {
             Future<String> found = exec.submit(() -> server.executeBlocking("testing", "test"));
             DRPCRequest request = getNextAvailableRequest(server, "testing");
             assertNotNull(request);
@@ -116,7 +118,7 @@ public class DRPCTest {
     @Test
     public void testDequeueAfterTimeout() throws Exception {
         long timeout = 1000;
-        try (DRPC server = new DRPC(null, timeout)) {
+        try (DRPC server = new DRPC(new StormMetricsRegistry(), null, timeout)) {
             long start = Time.currentTimeMillis();
             try {
                 server.executeBlocking("testing", "test");
@@ -136,7 +138,7 @@ public class DRPCTest {
 
     @Test
     public void testDeny() throws Exception {
-        try (DRPC server = new DRPC(new DenyAuthorizer(), 100)) {
+        try (DRPC server = new DRPC(new StormMetricsRegistry(), new DenyAuthorizer(), 100)) {
             assertThrows(() -> server.executeBlocking("testing", "test"), AuthorizationException.class);
             assertThrows(() -> server.fetchRequest("testing"), AuthorizationException.class);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index 1094e1c..1c313be 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 
 public class BasicContainerTest {
@@ -102,8 +103,8 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
 
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                       "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
-                                                       "profile");
+            "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+            new HashMap<>(), ops, "profile");
         //null worker id means generate one...
 
         assertNotNull(mc._workerId);
@@ -133,8 +134,8 @@ public class BasicContainerTest {
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
 
         MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf,
-                                                       "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
-                                                       "profile");
+            "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+            new HashMap<>(), ops, "profile");
 
         assertEquals(workerId, mc._workerId);
     }
@@ -155,7 +156,8 @@ public class BasicContainerTest {
 
         try {
             new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(),
-                                   "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), null, "profile");
+                "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+                new HashMap<>(), null, "profile");
             fail("Container recovered worker incorrectly");
         } catch (ContainerRecoveryException e) {
             //Expected
@@ -182,7 +184,7 @@ public class BasicContainerTest {
         when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
 
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                       "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
+            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(), new HashMap<>(), ops,
                                                        "profile");
 
         mc.cleanUp();
@@ -218,8 +220,8 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
 
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                       "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
-                                                       "profile");
+            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+            new HashMap<>(), ops, "profile");
 
         //HEAP DUMP
         ProfileRequest req = new ProfileRequest();
@@ -329,9 +331,8 @@ public class BasicContainerTest {
 
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                                      "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
-                                                                          HashMap<>(), ops,
-                                                                      "profile");
+                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
@@ -432,9 +433,8 @@ public class BasicContainerTest {
 
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                                      "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
-                                                                          HashMap<>(), ops,
-                                                                      "profile");
+                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
@@ -534,9 +534,8 @@ public class BasicContainerTest {
 
         checkpoint(() -> {
                        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                                      "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
-                                                                          HashMap<>(), ops,
-                                                                      "profile");
+                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+                new HashMap<>(), ops, "profile");
 
                        mc.launch();
 
@@ -612,8 +611,8 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
 
         MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
-                                                       "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
-                                                       "profile");
+            "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+            new HashMap<>(), ops, "profile");
 
         assertListEquals(Arrays.asList(
             "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
@@ -656,10 +655,10 @@ public class BasicContainerTest {
         public final List<CommandRun> workerCmds = new ArrayList<>();
         public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
                                   int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
-                                  LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
-                                  String profileCmd) throws IOException {
+                                  LocalState localState, String workerId, StormMetricsRegistry metricsRegistry, 
+                                  Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
             super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
-                  workerId, topoConf, ops, profileCmd);
+                  workerId, metricsRegistry,new ContainerMemoryTracker(metricsRegistry), topoConf, ops, profileCmd);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
index b510838..cf5ec8d 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -42,6 +42,8 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.storm.metric.StormMetricsRegistry;
+
 public class ContainerTest {
     private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls();
     private static final String DOUBLE_SEP = File.separator + File.separator;
@@ -71,7 +73,7 @@ public class ContainerTest {
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                                             "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops);
+                                             "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops, new StormMetricsRegistry());
         mc.kill();
         assertEquals(Collections.EMPTY_LIST, mc.killedPids);
         assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
@@ -134,7 +136,7 @@ public class ContainerTest {
         la.set_topology_id(topoId);
         la.set_owner(user);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                                             "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops);
+                                             "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops, new StormMetricsRegistry());
 
         mc.setup();
 
@@ -205,7 +207,7 @@ public class ContainerTest {
         la.set_owner(user);
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                                             "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops);
+                                             "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops, new StormMetricsRegistry());
         mc.allPids.add(pid);
 
         mc.cleanUp();
@@ -226,9 +228,9 @@ public class ContainerTest {
         public final Set<Long> allPids = new HashSet<>();
         protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
                                 int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
-                                String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException {
+                                String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, StormMetricsRegistry metricsRegistry) throws IOException {
             super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, workerId,
-                  topoConf, ops);
+                  topoConf, ops, metricsRegistry, new ContainerMemoryTracker(new StormMetricsRegistry()));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 6f09f3f..21a9bc2 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -52,6 +52,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
+import org.apache.storm.metric.StormMetricsRegistry;
+
 public class SlotTest {
     private static final Logger LOG = LoggerFactory.getLogger(SlotTest.class);
 
@@ -147,9 +149,11 @@ public class SlotTest {
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
             StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000,
-                                                      containerLauncher, "localhost", 8080, iSuper, state, cb, null, null);
-            DynamicState dynamicState = new DynamicState(null, null, null);
+                containerLauncher, "localhost", 8080, iSuper, state, cb, null, null,
+                slotMetrics);
+            DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics);
             DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertTrue(Time.currentTimeMillis() > 1000);
@@ -179,9 +183,10 @@ public class SlotTest {
             when(localizer.requestDownloadTopologyBlobs(newAssignment, port, cb)).thenReturn(blobFuture);
 
             ISupervisor iSuper = mock(ISupervisor.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
-            DynamicState dynamicState = new DynamicState(null, null, null)
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+            DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics)
                 .withNewAssignment(newAssignment);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -248,9 +253,10 @@ public class SlotTest {
 
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
-            DynamicState dynamicState = new DynamicState(assignment, container, assignment);
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+            DynamicState dynamicState = new DynamicState(assignment, container, assignment, slotMetrics);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
             assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
@@ -309,9 +315,10 @@ public class SlotTest {
             when(localizer.requestDownloadTopologyBlobs(nAssignment, port, cb)).thenReturn(blobFuture);
 
             ISupervisor iSuper = mock(ISupervisor.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
-            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment);
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment, slotMetrics);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
             assertEquals(MachineState.KILL, nextState.state);
@@ -390,9 +397,10 @@ public class SlotTest {
 
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
-            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null);
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null, slotMetrics);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
             assertEquals(MachineState.KILL, nextState.state);
@@ -452,7 +460,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, new SlotMetrics(new StormMetricsRegistry()));
             Set<TopoProfileAction> profileActions = new HashSet<>();
             ProfileRequest request = new ProfileRequest();
             request.set_action(ProfileAction.JPROFILE_STOP);
@@ -467,8 +475,8 @@ public class SlotTest {
             Set<TopoProfileAction> expectedPending = new HashSet<>();
             expectedPending.add(profile);
 
-
-            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment)
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment, slotMetrics)
                 .withProfileActions(profileActions, Collections.<TopoProfileAction>emptySet());
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -534,7 +542,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             long heartbeatTimeoutMs = 5000;
             StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000,
-                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null);
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, new SlotMetrics(new StormMetricsRegistry()));
 
             Set<Slot.BlobChanging> changing = new HashSet<>();
 
@@ -549,7 +557,8 @@ public class SlotTest {
             GoodToGo.GoodToGoLatch otherJarLatch = mock(GoodToGo.GoodToGoLatch.class);
             changing.add(new Slot.BlobChanging(otherAssignment, otherJar, otherJarLatch));
 
-            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withChangingBlobs(changing);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment, slotMetrics).withChangingBlobs(changing);
 
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
             assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state);

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 8654d25..243499a 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -79,6 +79,8 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.storm.metric.StormMetricsRegistry;
+
 public class AsyncLocalizerTest {
     private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizerTest.class);
     private final String user1 = "user1";
@@ -117,7 +119,7 @@ public class AsyncLocalizerTest {
         ReflectionUtils mockedRU = mock(ReflectionUtils.class);
         ServerUtils mockedU = mock(ServerUtils.class);
 
-        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, getTestLocalizerRoot()));
+        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, getTestLocalizerRoot(), new StormMetricsRegistry()));
         LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
         doReturn(jarBlob).when(bl).getTopoJar(topoId, la.get_owner());
         when(jarBlob.getLocalVersion()).thenReturn(-1L);
@@ -213,10 +215,11 @@ public class AsyncLocalizerTest {
         topoConf.put(Config.TOPOLOGY_NAME, topoName);
 
         List<LocalizedResource> localizedList = new ArrayList<>();
-        LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user);
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
         localizedList.add(simpleLocal);
 
-        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot));
+        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
         ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
         try {
             when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
@@ -889,7 +892,7 @@ public class AsyncLocalizerTest {
     class TestLocalizer extends AsyncLocalizer {
 
         TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
-            super(conf, AdvancedFSOps.make(conf), baseDir);
+            super(conf, AdvancedFSOps.make(conf), baseDir, new StormMetricsRegistry());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index f41c3df..db67e04 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import org.apache.storm.metric.StormMetricsRegistry;
+
 public class LocalizedResourceRetentionSetTest {
 
     @Test
@@ -43,9 +45,10 @@ public class LocalizedResourceRetentionSetTest {
         IAdvancedFSOps ops = mock(IAdvancedFSOps.class);
         LocalizedResourceRetentionSet lrretset = new LocalizedResourceRetentionSet(10);
         ConcurrentMap<String, LocalizedResource> lrset = new ConcurrentHashMap<>();
-        LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("testfile1"), false, ops, conf, user);
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("testfile1"), false, ops, conf, user, metricsRegistry);
         localresource1.addReference(pna1, null);
-        LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("testfile2"), false, ops, conf, user);
+        LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("testfile2"), false, ops, conf, user, metricsRegistry);
         localresource2.addReference(pna1, null);
         // check adding reference to local resource with topology of same name
         localresource2.addReference(pna2, null);
@@ -81,17 +84,18 @@ public class LocalizedResourceRetentionSetTest {
         LocalizedResourceRetentionSet lrretset = spy(new LocalizedResourceRetentionSet(10));
         ConcurrentMap<String, LocalizedResource> lrFiles = new ConcurrentHashMap<>();
         ConcurrentMap<String, LocalizedResource> lrArchives = new ConcurrentHashMap<>();
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         // no reference to key1
         LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("./target/TESTING/testfile1"), false, ops, conf,
-                                                                 user);
+                                                                 user, metricsRegistry);
         localresource1.setSize(10);
         // no reference to archive1
         LocalizedResource archiveresource1 = new LocalizedResource("archive1", Paths.get("./target/TESTING/testarchive1"), true, ops,
-                                                                   conf, user);
+                                                                   conf, user, metricsRegistry);
         archiveresource1.setSize(20);
         // reference to key2
         LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("./target/TESTING/testfile2"), false, ops, conf,
-                                                                 user);
+                                                                 user, metricsRegistry);
         localresource2.addReference(pna1, null);
         // check adding reference to local resource with topology of same name
         localresource2.addReference(pna1, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
deleted file mode 100644
index 5d9b3e4..0000000
--- a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.metric;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Timer;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.codahale.metrics.MetricRegistry.name;
-import static org.junit.jupiter.api.Assertions.*;
-
-class StormMetricsRegistryTest {
-    private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class);
-
-    private static final String OUTER_METER = "outerMeter";
-    private static final String INNER_SET = "innerSet";
-    private static final String OUTER_TIMER = "outerTimer";
-    private static final String INNER_METER = "innerMeter";
-    private static final String INNER_TIMER = "innerTimer";
-    private static final MetricSet OUTER = newMetricSetInstance();
-
-    @Test
-    void registerMetricSet() {
-        Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER));
-
-        LOG.info("register outer set");
-        StormMetricsRegistry.registerMetricSet(OUTER);
-        assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
-        assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
-        assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
-            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
-
-        assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER),
-            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
-        assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
-
-        //Ensure idempotency
-        LOG.info("twice register outer set");
-        MetricSet newOuter = newMetricSetInstance();
-        StormMetricsRegistry.registerMetricSet(newOuter);
-        assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
-        assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
-        assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
-            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
-        assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
-
-        LOG.info("name collision");
-        assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0));
-    }
-
-    @Test
-    void unregisterMetricSet() {
-        StormMetricsRegistry.registerMetricSet(OUTER);
-        StormMetricsRegistry.unregisterMetricSet(OUTER);
-        assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty());
-
-    }
-
-    private static MetricSet newMetricSetInstance() {
-        return new MetricSet() {
-            private final MetricSet inner = new MetricSet() {
-                private final Map<String, Metric> map = new HashMap<>();
-
-                {
-                    map.put(INNER_METER, new Meter());
-                    map.put(INNER_TIMER, new Timer());
-                }
-
-                @Override
-                public Map<String, Metric> getMetrics() {
-                    return map;
-                }
-            };
-            private final Map<String, Metric> outerMap = new HashMap<>();
-
-            {
-                outerMap.put(OUTER_METER, new Meter());
-                outerMap.put(INNER_SET, inner);
-                outerMap.put(OUTER_TIMER, new Timer());
-            }
-
-            @Override
-            public Map<String, Metric> getMetrics() {
-                return outerMap;
-            }
-        };
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
index 63df80a..9609c3a 100644
--- a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
+++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
 
 public class RocksDbStoreTest {
     private final static Logger LOG = LoggerFactory.getLogger(RocksDbStoreTest.class);
@@ -57,7 +58,7 @@ public class RocksDbStoreTest {
         conf.put(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING, true);
         conf.put(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY, 4000);
         conf.put(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS, 240);
-        store = MetricStoreConfig.configure(conf);
+        store = MetricStoreConfig.configure(conf, new StormMetricsRegistry());
     }
 
     @AfterClass
@@ -307,7 +308,7 @@ public class RocksDbStoreTest {
         Assert.assertTrue(list.size() >= 2);
 
         // delete anything older than an hour
-        MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, null);
+        MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, null, new StormMetricsRegistry());
         cleaner.purgeMetrics();
         list = getMetricsFromScan(filter);
         Assert.assertEquals(1, list.size());

http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index daf8671..9a34080 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 
 public class FaultGenerateUtils {
 
@@ -58,6 +60,6 @@ public class FaultGenerateUtils {
         } else {
             assignment = TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments());
         }
-        return new Cluster(iNimbus, supervisors, assignment, topologies, config);
+        return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, assignment, topologies, config);
     }
 }