You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/04/03 16:28:33 UTC

[storm] branch master updated: STORM-3605 add meter to track scheduling timeouts

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c9070  STORM-3605 add meter to track scheduling timeouts
     new 4a9fc42  Merge pull request #3235 from agresch/agresch_storm_3605
25c9070 is described below

commit 25c9070601fc5d58b71b7acdb9d58d7d3af6ebeb
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Mar 25 13:26:26 2020 -0500

    STORM-3605 add meter to track scheduling timeouts
---
 docs/ClusterMetrics.md                             |  1 +
 .../storm/scheduler/multitenant_scheduler_test.clj | 12 +++---
 .../org/apache/storm/daemon/nimbus/Nimbus.java     |  4 +-
 .../apache/storm/scheduler/DefaultScheduler.java   |  3 +-
 .../org/apache/storm/scheduler/EvenScheduler.java  |  3 +-
 .../org/apache/storm/scheduler/IScheduler.java     |  3 +-
 .../apache/storm/scheduler/IsolationScheduler.java |  3 +-
 .../scheduler/blacklist/BlacklistScheduler.java    | 10 ++---
 .../multitenant/MultitenantScheduler.java          |  3 +-
 .../scheduler/resource/ResourceAwareScheduler.java |  7 +++-
 .../blacklist/TestBlacklistScheduler.java          | 32 +++++++--------
 .../resource/TestResourceAwareScheduler.java       | 48 +++++++++++-----------
 .../eviction/TestDefaultEvictionStrategy.java      | 10 ++---
 .../TestFIFOSchedulingPriorityStrategy.java        |  2 +-
 .../scheduling/TestConstraintSolverStrategy.java   |  6 +--
 .../TestDefaultResourceAwareStrategy.java          |  8 ++--
 .../TestGenericResourceAwareStrategy.java          |  8 ++--
 17 files changed, 87 insertions(+), 76 deletions(-)

diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index 7a3adea..7760f51 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -92,6 +92,7 @@ These are metrics that are specific to a nimbus instance.  In many instances onl
 | nimbus:num-net-slots-increase-per-scheduling | histogram | added slots minus removed slots after a scheduling run |
 | nimbus:num-rebalance-calls | meter | calls to rebalance thrift method. |
 | nimbus:num-removed-executors-per-scheduling | histogram | number of executors removed after a scheduling run |
+| nimbus:num-scheduling-timeouts | meter | number of timeouts during scheduling |
 | nimbus:num-removed-slots-per-scheduling | histogram | number of slots removed after a scheduling run |
 | nimbus:num-setLogConfig-calls | meter | calls to setLogConfig thrift method. |
 | nimbus:num-setWorkerProfiler-calls | meter | calls to setWorkerProfiler thrift method. |
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 2808770..3e11aeb 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -674,7 +674,7 @@
        scheduler (MultitenantScheduler.)]
     (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
     (.assign (.get node-map "super1") "topology2" (list (ed 5)) cluster)
-    (.prepare scheduler conf)
+    (.prepare scheduler conf (StormMetricsRegistry.))
     (try
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
@@ -713,7 +713,7 @@
         conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
         scheduler (MultitenantScheduler.)]
     (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
-    (.prepare scheduler conf)
+    (.prepare scheduler conf (StormMetricsRegistry.))
     (try
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
@@ -755,7 +755,7 @@
           cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
           conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 2 "userB" 1}}
           scheduler (MultitenantScheduler.)]
-      (.prepare scheduler conf)
+      (.prepare scheduler conf (StormMetricsRegistry.))
       (try
       (.schedule scheduler topologies cluster)
       (let [assignment (.getAssignmentById cluster "topology1")
@@ -784,7 +784,7 @@
           cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
           conf {}
           scheduler (MultitenantScheduler.)]
-      (.prepare scheduler conf)
+      (.prepare scheduler conf (StormMetricsRegistry.))
       (try
       (.schedule scheduler topologies cluster)
       (let [assignment (.getAssignmentById cluster "topology1")
@@ -824,7 +824,7 @@
           cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
           conf {}
           scheduler (MultitenantScheduler.)]
-      (.prepare scheduler conf)
+      (.prepare scheduler conf (StormMetricsRegistry.))
       (try
       (.schedule scheduler topologies cluster)
       (let [assignment (.getAssignmentById cluster "topology1")
@@ -864,7 +864,7 @@
         cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
         conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 2}}
         scheduler (MultitenantScheduler.)]
-    (.prepare scheduler conf)
+    (.prepare scheduler conf (StormMetricsRegistry.))
     (try
     (.schedule scheduler topologies cluster)
     (finally (.cleanup scheduler)))
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index daccf15..8153bf6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -645,8 +645,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
     private static IScheduler wrapAsBlacklistScheduler(Map<String, Object> conf, IScheduler scheduler,
         StormMetricsRegistry metricsRegistry) {
-        BlacklistScheduler blacklistWrappedScheduler = new BlacklistScheduler(scheduler, metricsRegistry);
-        blacklistWrappedScheduler.prepare(conf);
+        BlacklistScheduler blacklistWrappedScheduler = new BlacklistScheduler(scheduler);
+        blacklistWrappedScheduler.prepare(conf, metricsRegistry);
         return blacklistWrappedScheduler;
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 84498a3..c137521 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.Utils;
 
 public class DefaultScheduler implements IScheduler {
@@ -97,7 +98,7 @@ public class DefaultScheduler implements IScheduler {
     }
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         //noop
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index e6b9e20..e4431a9 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.apache.storm.utils.ServerUtils;
@@ -161,7 +162,7 @@ public class EvenScheduler implements IScheduler {
     }
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         //noop
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
index f2c0d26..3e150ec 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -13,10 +13,11 @@
 package org.apache.storm.scheduler;
 
 import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
 
 public interface IScheduler {
 
-    void prepare(Map<String, Object> conf);
+    void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry);
 
     /**
      * Set assignments for the topologies which needs scheduling. The new assignments is available 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index 85e752b..75d8d9b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.lang.Validate;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class IsolationScheduler implements IScheduler {
     private Map<String, Number> isoMachines;
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         this.isoMachines = (Map<String, Number>) conf.get(DaemonConfig.ISOLATION_SCHEDULER_MACHINES);
         Validate.notEmpty(isoMachines);
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 179f3d0..b5c18ca 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -43,7 +43,7 @@ public class BlacklistScheduler implements IScheduler {
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
     private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
     private final IScheduler underlyingScheduler;
-    private final StormMetricsRegistry metricsRegistry;
+    private StormMetricsRegistry metricsRegistry;
     protected int toleranceTime;
     protected int toleranceCount;
     protected int resumeTime;
@@ -58,16 +58,16 @@ public class BlacklistScheduler implements IScheduler {
     private boolean blacklistOnBadSlots;
     private Map<String, Object> conf;
 
-    public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
+    public BlacklistScheduler(IScheduler underlyingScheduler) {
         this.underlyingScheduler = underlyingScheduler;
-        this.metricsRegistry = metricsRegistry;
     }
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         LOG.info("Preparing black list scheduler");
-        underlyingScheduler.prepare(conf);
+        underlyingScheduler.prepare(conf, metricsRegistry);
         this.conf = conf;
+        this.metricsRegistry = metricsRegistry;
 
         toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
                                             DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 24c008e..aa75ff8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -15,6 +15,7 @@ package org.apache.storm.scheduler.multitenant;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.Topologies;
@@ -31,7 +32,7 @@ public class MultitenantScheduler implements IScheduler {
     private Map<String, Object> conf;
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         this.conf = conf;
         configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 5808dd8..f7e34ec 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.scheduler.resource;
 
+import com.codahale.metrics.Meter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -55,6 +57,7 @@ public class ResourceAwareScheduler implements IScheduler {
     private int maxSchedulingAttempts;
     private int schedulingTimeoutSeconds;
     private ExecutorService backgroundScheduling;
+    private Meter schedulingTimeoutMeter;
 
     private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
         markFailedTopology(u, c, td, message, null);
@@ -72,8 +75,9 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     @Override
-    public void prepare(Map<String, Object> conf) {
+    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         this.conf = conf;
+        schedulingTimeoutMeter = metricsRegistry.registerMeter("nimbus:num-scheduling-timeouts");
         schedulingPriorityStrategy = ReflectionUtils.newInstance(
             (String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
         configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
@@ -172,6 +176,7 @@ public class ResourceAwareScheduler implements IScheduler {
                                 + td.getId() + " using strategy " + rasStrategy.getClass().getName() + " timeout after "
                                 + schedulingTimeoutSeconds + " seconds using config "
                                 + DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY + ".");
+                        schedulingTimeoutMeter.mark();
                         schedulingFuture.cancel(true);
                         return;
                     }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index aad9d98..ce9bdce 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -87,8 +87,8 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), topologies, config);
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, metricsRegistry);
         scheduler.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
@@ -122,8 +122,8 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, metricsRegistry);
         scheduler.schedule(topologies, cluster);
 
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,
@@ -163,8 +163,8 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, metricsRegistry);
         scheduler.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
@@ -205,8 +205,8 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, metricsRegistry);
         scheduler.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
@@ -250,9 +250,9 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        scheduler = new BlacklistScheduler(new ResourceAwareScheduler(), metricsRegistry);
+        scheduler = new BlacklistScheduler(new ResourceAwareScheduler());
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, metricsRegistry);
         scheduler.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
@@ -288,8 +288,8 @@ public class TestBlacklistScheduler {
         topoMap.put(topo1.getId(), topo1);
         topoMap.put(topo2.getId(), topo2);
         Topologies topologies = new Topologies(topoMap);
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), new StormMetricsRegistry());
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, new StormMetricsRegistry());
 
         List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
 
@@ -377,9 +377,9 @@ public class TestBlacklistScheduler {
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
         Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
         scheduler = bs;
-        bs.prepare(config);
+        bs.prepare(config, metricsRegistry);
         bs.schedule(topologies,cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"),
                 TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
@@ -410,8 +410,8 @@ public class TestBlacklistScheduler {
         config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
 
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
-        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
-        scheduler.prepare(config);
+        scheduler = new BlacklistScheduler(new DefaultScheduler());
+        scheduler.prepare(config, metricsRegistry);
 
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5,
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index d8ea9ae..9a98411 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -174,7 +174,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topology1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
@@ -223,7 +223,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topology1, topology2);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -273,7 +273,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topology1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
-        rs.prepare(config);
+        rs.prepare(config, new StormMetricsRegistry());
         rs.schedule(topologies, cluster);
 
         SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -318,7 +318,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topology1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
-        rs.prepare(config);
+        rs.prepare(config, new StormMetricsRegistry());
         rs.schedule(topologies, cluster);
 
         SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -414,7 +414,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topology2);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
 
-        scheduler.prepare(config1);
+        scheduler.prepare(config1, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         SchedulerAssignment assignment = cluster.getAssignmentById(topology2.getId());
@@ -594,7 +594,7 @@ public class TestResourceAwareScheduler {
         LOG.info("\n\n\t\tScheduling topologies 1, 2 and 3");
         Topologies topologies = new Topologies(topology1, topology2, topology3);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
-        rs.prepare(config1);
+        rs.prepare(config1, new StormMetricsRegistry());
 
         Map<SupervisorDetails, Double> superToCpu = null;
         Map<SupervisorDetails, Double> superToMem = null;
@@ -630,7 +630,7 @@ public class TestResourceAwareScheduler {
         // scheduled
         topologies = new Topologies(topology1, topology2, topology4);
         cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
-        rs.prepare(config1);
+        rs.prepare(config1, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             int numTopologiesAssigned = 0;
@@ -656,7 +656,7 @@ public class TestResourceAwareScheduler {
         //Test3: "Launch topo5 only, both mem and cpu should be exactly used up"
         topologies = new Topologies(topology5);
         cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
-        rs.prepare(config1);
+        rs.prepare(config1, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             superToCpu = getSupervisorToCpuUsage(cluster, topologies);
@@ -704,7 +704,7 @@ public class TestResourceAwareScheduler {
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         Topologies topologies = new Topologies(topology1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
-        rs.prepare(config1);
+        rs.prepare(config1, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
@@ -728,7 +728,7 @@ public class TestResourceAwareScheduler {
         TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user");
         topologies = new Topologies(topology2);
         cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config2);
-        rs.prepare(config2);
+        rs.prepare(config2, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             String status = cluster.getStatusMap().get(topology2.getId());
@@ -765,7 +765,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4");
         assertTopologiesNotScheduled(cluster, "topo-5");
@@ -800,7 +800,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         for (TopologyDetails td : topologies) {
             assertTopologiesFullyScheduled(cluster, td.getName());
@@ -822,7 +822,7 @@ public class TestResourceAwareScheduler {
             genTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, "jerry"));
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         assertTopologiesFullyScheduled(cluster, "topo-1");
@@ -853,7 +853,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4", "topo-5", "topo-6");
@@ -901,7 +901,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         Map<String, RasNode> nodes = RasNodes.getAllNodesFrom(cluster);
@@ -943,7 +943,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -967,7 +967,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topo1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
     }
@@ -988,7 +988,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topo1, topo2, topo3);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
         assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
@@ -1034,7 +1034,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topo0, topo1, topo2, topo3, topo4, topo5, topo6, topo7, topo8, topo9);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
         assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -1063,7 +1063,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topo1);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
     }
@@ -1243,7 +1243,7 @@ public class TestResourceAwareScheduler {
         // schedule first block (0% - 10%)
         {
             scheduler = new ResourceAwareScheduler();
-            scheduler.prepare(config);
+            scheduler.prepare(config, new StormMetricsRegistry());
 
             long time = Time.currentTimeMillis();
             scheduler.schedule(topologies, cluster);
@@ -1308,7 +1308,7 @@ public class TestResourceAwareScheduler {
         Topologies topologies = new Topologies(topo);
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         assertTrue("Topo scheduled?", cluster.getAssignmentById(topo.getId()) != null);
@@ -1405,7 +1405,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -1454,7 +1454,7 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
         long startTime = Time.currentTimeMillis();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         long schedulingDuration = Time.currentTimeMillis() - startTime;
         LOG.info("Scheduling took " + schedulingDuration + " ms");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
index 7baca51..854dce1 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
@@ -73,7 +73,7 @@ public class TestDefaultEvictionStrategy {
 
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         
         assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4");
@@ -105,7 +105,7 @@ public class TestDefaultEvictionStrategy {
             genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         LOG.info("\n\n\t\tScheduling topos 2 to 5...");
         scheduler.schedule(topologies, cluster);
         LOG.info("\n\n\t\tDone scheduling...");
@@ -139,7 +139,7 @@ public class TestDefaultEvictionStrategy {
             genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29, "derek"));
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         
         assertTopologiesFullyScheduled(cluster, "topo-2", "topo-3", "topo-4", "topo-5");
@@ -195,7 +195,7 @@ public class TestDefaultEvictionStrategy {
             genTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         LOG.info("\n\n\t\tScheduling topos 1,2,5,6");
         scheduler.schedule(topologies, cluster);
         LOG.info("\n\n\t\tDone Scheduling...");
@@ -247,7 +247,7 @@ public class TestDefaultEvictionStrategy {
             genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         LOG.info("\n\n\t\tScheduling topos 1,3,4,5");
         scheduler.schedule(topologies, cluster);
         LOG.info("\n\n\t\tDone scheduling...");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
index 96d8da9..1de1b02 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
@@ -59,7 +59,7 @@ public class TestFIFOSchedulingPriorityStrategy {
             Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
 
             ResourceAwareScheduler rs = new ResourceAwareScheduler();
-            rs.prepare(config);
+            rs.prepare(config, new StormMetricsRegistry());
             try {
                 rs.schedule(topologies, cluster);
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 65d230d..d3a7438 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -410,7 +410,7 @@ public class TestConstraintSolverStrategy {
         Cluster cluster = makeCluster(topologies, supMap);
 
         ResourceAwareScheduler scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
@@ -454,7 +454,7 @@ public class TestConstraintSolverStrategy {
         Map<String, SupervisorDetails> supMap = genSupervisors(37, 16, 400, 1024 * 4);
         Cluster cluster = makeCluster(topologies, supMap);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
-        rs.prepare(config);
+        rs.prepare(config, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             assertStatusSuccess(cluster, topo.getId());
@@ -476,7 +476,7 @@ public class TestConstraintSolverStrategy {
         newAssignments.put(topo.getId(), new SchedulerAssignmentImpl(topo.getId(), newExecToSlot, null, null));
         cluster.setAssignments(newAssignments, false);
 
-        rs.prepare(config);
+        rs.prepare(config, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
             assertStatusSuccess(cluster, topo.getId());
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 30f2bb7..a2f2437 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -161,7 +161,7 @@ public class TestDefaultResourceAwareStrategy {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
 
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
@@ -257,7 +257,7 @@ public class TestDefaultResourceAwareStrategy {
 
         // schedule 1st topology
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTopologiesFullyScheduled(cluster, topo[0].getName());
 
@@ -324,7 +324,7 @@ public class TestDefaultResourceAwareStrategy {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
 
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         // [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
@@ -452,7 +452,7 @@ public class TestDefaultResourceAwareStrategy {
 
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 5c28a87..51252ad 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -118,7 +118,7 @@ public class TestGenericResourceAwareStrategy {
 
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         
         for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
@@ -206,7 +206,7 @@ public class TestGenericResourceAwareStrategy {
 
         scheduler = new ResourceAwareScheduler();
 
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
         //We need to have 3 slots on 3 separate hosts. The topology needs 6 GPUs 3500 MB memory and 350% CPU
@@ -285,7 +285,7 @@ public class TestGenericResourceAwareStrategy {
 
         // should schedule gpu1 and noGpu successfully
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(conf);
+        scheduler.prepare(conf, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
         assertTopologiesFullyScheduled(cluster, gpu1);
         assertTopologiesFullyScheduled(cluster, noGpu);
@@ -313,7 +313,7 @@ public class TestGenericResourceAwareStrategy {
         config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
 
         scheduler = new ResourceAwareScheduler();
-        scheduler.prepare(config);
+        scheduler.prepare(config, new StormMetricsRegistry());
 
         TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
             5, 100, 300, 0, 0, "user", 8192);