You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/04/03 16:28:33 UTC
[storm] branch master updated: STORM-3605 add meter to track
scheduling timeouts
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 25c9070 STORM-3605 add meter to track scheduling timeouts
new 4a9fc42 Merge pull request #3235 from agresch/agresch_storm_3605
25c9070 is described below
commit 25c9070601fc5d58b71b7acdb9d58d7d3af6ebeb
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Mar 25 13:26:26 2020 -0500
STORM-3605 add meter to track scheduling timeouts
---
docs/ClusterMetrics.md | 1 +
.../storm/scheduler/multitenant_scheduler_test.clj | 12 +++---
.../org/apache/storm/daemon/nimbus/Nimbus.java | 4 +-
.../apache/storm/scheduler/DefaultScheduler.java | 3 +-
.../org/apache/storm/scheduler/EvenScheduler.java | 3 +-
.../org/apache/storm/scheduler/IScheduler.java | 3 +-
.../apache/storm/scheduler/IsolationScheduler.java | 3 +-
.../scheduler/blacklist/BlacklistScheduler.java | 10 ++---
.../multitenant/MultitenantScheduler.java | 3 +-
.../scheduler/resource/ResourceAwareScheduler.java | 7 +++-
.../blacklist/TestBlacklistScheduler.java | 32 +++++++--------
.../resource/TestResourceAwareScheduler.java | 48 +++++++++++-----------
.../eviction/TestDefaultEvictionStrategy.java | 10 ++---
.../TestFIFOSchedulingPriorityStrategy.java | 2 +-
.../scheduling/TestConstraintSolverStrategy.java | 6 +--
.../TestDefaultResourceAwareStrategy.java | 8 ++--
.../TestGenericResourceAwareStrategy.java | 8 ++--
17 files changed, 87 insertions(+), 76 deletions(-)
diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index 7a3adea..7760f51 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -92,6 +92,7 @@ These are metrics that are specific to a nimbus instance. In many instances onl
| nimbus:num-net-slots-increase-per-scheduling | histogram | added slots minus removed slots after a scheduling run |
| nimbus:num-rebalance-calls | meter | calls to rebalance thrift method. |
| nimbus:num-removed-executors-per-scheduling | histogram | number of executors removed after a scheduling run |
+| nimbus:num-scheduling-timeouts | meter | number of timeouts during scheduling |
| nimbus:num-removed-slots-per-scheduling | histogram | number of slots removed after a scheduling run |
| nimbus:num-setLogConfig-calls | meter | calls to setLogConfig thrift method. |
| nimbus:num-setWorkerProfiler-calls | meter | calls to setWorkerProfiler thrift method. |
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 2808770..3e11aeb 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -674,7 +674,7 @@
scheduler (MultitenantScheduler.)]
(.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
(.assign (.get node-map "super1") "topology2" (list (ed 5)) cluster)
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
@@ -713,7 +713,7 @@
conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
scheduler (MultitenantScheduler.)]
(.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
@@ -755,7 +755,7 @@
cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 2 "userB" 1}}
scheduler (MultitenantScheduler.)]
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
@@ -784,7 +784,7 @@
cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
conf {}
scheduler (MultitenantScheduler.)]
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
@@ -824,7 +824,7 @@
cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
conf {}
scheduler (MultitenantScheduler.)]
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
@@ -864,7 +864,7 @@
cluster (Cluster. (Nimbus$StandaloneINimbus.) (ResourceMetrics. (StormMetricsRegistry.)) supers existing-assignments topologies {})
conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 2}}
scheduler (MultitenantScheduler.)]
- (.prepare scheduler conf)
+ (.prepare scheduler conf (StormMetricsRegistry.))
(try
(.schedule scheduler topologies cluster)
(finally (.cleanup scheduler)))
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index daccf15..8153bf6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -645,8 +645,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static IScheduler wrapAsBlacklistScheduler(Map<String, Object> conf, IScheduler scheduler,
StormMetricsRegistry metricsRegistry) {
- BlacklistScheduler blacklistWrappedScheduler = new BlacklistScheduler(scheduler, metricsRegistry);
- blacklistWrappedScheduler.prepare(conf);
+ BlacklistScheduler blacklistWrappedScheduler = new BlacklistScheduler(scheduler);
+ blacklistWrappedScheduler.prepare(conf, metricsRegistry);
return blacklistWrappedScheduler;
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 84498a3..c137521 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.Utils;
public class DefaultScheduler implements IScheduler {
@@ -97,7 +98,7 @@ public class DefaultScheduler implements IScheduler {
}
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
//noop
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index e6b9e20..e4431a9 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.utils.ServerUtils;
@@ -161,7 +162,7 @@ public class EvenScheduler implements IScheduler {
}
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
//noop
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
index f2c0d26..3e150ec 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -13,10 +13,11 @@
package org.apache.storm.scheduler;
import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
public interface IScheduler {
- void prepare(Map<String, Object> conf);
+ void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry);
/**
* Set assignments for the topologies which needs scheduling. The new assignments is available
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index 85e752b..75d8d9b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.Validate;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class IsolationScheduler implements IScheduler {
private Map<String, Number> isoMachines;
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
this.isoMachines = (Map<String, Number>) conf.get(DaemonConfig.ISOLATION_SCHEDULER_MACHINES);
Validate.notEmpty(isoMachines);
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 179f3d0..b5c18ca 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -43,7 +43,7 @@ public class BlacklistScheduler implements IScheduler {
public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
private final IScheduler underlyingScheduler;
- private final StormMetricsRegistry metricsRegistry;
+ private StormMetricsRegistry metricsRegistry;
protected int toleranceTime;
protected int toleranceCount;
protected int resumeTime;
@@ -58,16 +58,16 @@ public class BlacklistScheduler implements IScheduler {
private boolean blacklistOnBadSlots;
private Map<String, Object> conf;
- public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
+ public BlacklistScheduler(IScheduler underlyingScheduler) {
this.underlyingScheduler = underlyingScheduler;
- this.metricsRegistry = metricsRegistry;
}
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
LOG.info("Preparing black list scheduler");
- underlyingScheduler.prepare(conf);
+ underlyingScheduler.prepare(conf, metricsRegistry);
this.conf = conf;
+ this.metricsRegistry = metricsRegistry;
toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 24c008e..aa75ff8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -15,6 +15,7 @@ package org.apache.storm.scheduler.multitenant;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.Topologies;
@@ -31,7 +32,7 @@ public class MultitenantScheduler implements IScheduler {
private Map<String, Object> conf;
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
this.conf = conf;
configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 5808dd8..f7e34ec 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -12,6 +12,7 @@
package org.apache.storm.scheduler.resource;
+import com.codahale.metrics.Meter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
@@ -55,6 +57,7 @@ public class ResourceAwareScheduler implements IScheduler {
private int maxSchedulingAttempts;
private int schedulingTimeoutSeconds;
private ExecutorService backgroundScheduling;
+ private Meter schedulingTimeoutMeter;
private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
markFailedTopology(u, c, td, message, null);
@@ -72,8 +75,9 @@ public class ResourceAwareScheduler implements IScheduler {
}
@Override
- public void prepare(Map<String, Object> conf) {
+ public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
this.conf = conf;
+ schedulingTimeoutMeter = metricsRegistry.registerMeter("nimbus:num-scheduling-timeouts");
schedulingPriorityStrategy = ReflectionUtils.newInstance(
(String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
@@ -172,6 +176,7 @@ public class ResourceAwareScheduler implements IScheduler {
+ td.getId() + " using strategy " + rasStrategy.getClass().getName() + " timeout after "
+ schedulingTimeoutSeconds + " seconds using config "
+ DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY + ".");
+ schedulingTimeoutMeter.mark();
schedulingFuture.cancel(true);
return;
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index aad9d98..ce9bdce 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -87,8 +87,8 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), topologies, config);
- scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, metricsRegistry);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
@@ -122,8 +122,8 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
- scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, metricsRegistry);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,
@@ -163,8 +163,8 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
- scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, metricsRegistry);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
@@ -205,8 +205,8 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
- scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, metricsRegistry);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
@@ -250,9 +250,9 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
- scheduler = new BlacklistScheduler(new ResourceAwareScheduler(), metricsRegistry);
+ scheduler = new BlacklistScheduler(new ResourceAwareScheduler());
- scheduler.prepare(config);
+ scheduler.prepare(config, metricsRegistry);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
@@ -288,8 +288,8 @@ public class TestBlacklistScheduler {
topoMap.put(topo1.getId(), topo1);
topoMap.put(topo2.getId(), topo2);
Topologies topologies = new Topologies(topoMap);
- scheduler = new BlacklistScheduler(new DefaultScheduler(), new StormMetricsRegistry());
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, new StormMetricsRegistry());
List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
@@ -377,9 +377,9 @@ public class TestBlacklistScheduler {
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
- BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
+ BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
scheduler = bs;
- bs.prepare(config);
+ bs.prepare(config, metricsRegistry);
bs.schedule(topologies,cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"),
TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
@@ -410,8 +410,8 @@ public class TestBlacklistScheduler {
config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
- scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
- scheduler.prepare(config);
+ scheduler = new BlacklistScheduler(new DefaultScheduler());
+ scheduler.prepare(config, metricsRegistry);
Map<String, TopologyDetails> topoMap = new HashMap<>();
TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5,
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index d8ea9ae..9a98411 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -174,7 +174,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topology1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
@@ -223,7 +223,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topology1, topology2);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -273,7 +273,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topology1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
- rs.prepare(config);
+ rs.prepare(config, new StormMetricsRegistry());
rs.schedule(topologies, cluster);
SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -318,7 +318,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topology1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
- rs.prepare(config);
+ rs.prepare(config, new StormMetricsRegistry());
rs.schedule(topologies, cluster);
SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
@@ -414,7 +414,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topology2);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
- scheduler.prepare(config1);
+ scheduler.prepare(config1, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
SchedulerAssignment assignment = cluster.getAssignmentById(topology2.getId());
@@ -594,7 +594,7 @@ public class TestResourceAwareScheduler {
LOG.info("\n\n\t\tScheduling topologies 1, 2 and 3");
Topologies topologies = new Topologies(topology1, topology2, topology3);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
- rs.prepare(config1);
+ rs.prepare(config1, new StormMetricsRegistry());
Map<SupervisorDetails, Double> superToCpu = null;
Map<SupervisorDetails, Double> superToMem = null;
@@ -630,7 +630,7 @@ public class TestResourceAwareScheduler {
// scheduled
topologies = new Topologies(topology1, topology2, topology4);
cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
- rs.prepare(config1);
+ rs.prepare(config1, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
int numTopologiesAssigned = 0;
@@ -656,7 +656,7 @@ public class TestResourceAwareScheduler {
//Test3: "Launch topo5 only, both mem and cpu should be exactly used up"
topologies = new Topologies(topology5);
cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
- rs.prepare(config1);
+ rs.prepare(config1, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
superToCpu = getSupervisorToCpuUsage(cluster, topologies);
@@ -704,7 +704,7 @@ public class TestResourceAwareScheduler {
ResourceAwareScheduler rs = new ResourceAwareScheduler();
Topologies topologies = new Topologies(topology1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config1);
- rs.prepare(config1);
+ rs.prepare(config1, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
@@ -728,7 +728,7 @@ public class TestResourceAwareScheduler {
TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user");
topologies = new Topologies(topology2);
cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config2);
- rs.prepare(config2);
+ rs.prepare(config2, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
String status = cluster.getStatusMap().get(topology2.getId());
@@ -765,7 +765,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4");
assertTopologiesNotScheduled(cluster, "topo-5");
@@ -800,7 +800,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
for (TopologyDetails td : topologies) {
assertTopologiesFullyScheduled(cluster, td.getName());
@@ -822,7 +822,7 @@ public class TestResourceAwareScheduler {
genTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29, "jerry"));
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, "topo-1");
@@ -853,7 +853,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4", "topo-5", "topo-6");
@@ -901,7 +901,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
Map<String, RasNode> nodes = RasNodes.getAllNodesFrom(cluster);
@@ -943,7 +943,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -967,7 +967,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topo1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}
@@ -988,7 +988,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topo1, topo2, topo3);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
@@ -1034,7 +1034,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topo0, topo1, topo2, topo3, topo4, topo5, topo6, topo7, topo8, topo9);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -1063,7 +1063,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topo1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}
@@ -1243,7 +1243,7 @@ public class TestResourceAwareScheduler {
// schedule first block (0% - 10%)
{
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
long time = Time.currentTimeMillis();
scheduler.schedule(topologies, cluster);
@@ -1308,7 +1308,7 @@ public class TestResourceAwareScheduler {
Topologies topologies = new Topologies(topo);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTrue("Topo scheduled?", cluster.getAssignmentById(topo.getId()) != null);
@@ -1405,7 +1405,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
@@ -1454,7 +1454,7 @@ public class TestResourceAwareScheduler {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
long startTime = Time.currentTimeMillis();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
long schedulingDuration = Time.currentTimeMillis() - startTime;
LOG.info("Scheduling took " + schedulingDuration + " ms");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
index 7baca51..854dce1 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/eviction/TestDefaultEvictionStrategy.java
@@ -73,7 +73,7 @@ public class TestDefaultEvictionStrategy {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4");
@@ -105,7 +105,7 @@ public class TestDefaultEvictionStrategy {
genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
LOG.info("\n\n\t\tScheduling topos 2 to 5...");
scheduler.schedule(topologies, cluster);
LOG.info("\n\n\t\tDone scheduling...");
@@ -139,7 +139,7 @@ public class TestDefaultEvictionStrategy {
genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29, "derek"));
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, "topo-2", "topo-3", "topo-4", "topo-5");
@@ -195,7 +195,7 @@ public class TestDefaultEvictionStrategy {
genTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
LOG.info("\n\n\t\tScheduling topos 1,2,5,6");
scheduler.schedule(topologies, cluster);
LOG.info("\n\n\t\tDone Scheduling...");
@@ -247,7 +247,7 @@ public class TestDefaultEvictionStrategy {
genTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29, "derek"));
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
LOG.info("\n\n\t\tScheduling topos 1,3,4,5");
scheduler.schedule(topologies, cluster);
LOG.info("\n\n\t\tDone scheduling...");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
index 96d8da9..1de1b02 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/priority/TestFIFOSchedulingPriorityStrategy.java
@@ -59,7 +59,7 @@ public class TestFIFOSchedulingPriorityStrategy {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
- rs.prepare(config);
+ rs.prepare(config, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 65d230d..d3a7438 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -410,7 +410,7 @@ public class TestConstraintSolverStrategy {
Cluster cluster = makeCluster(topologies, supMap);
ResourceAwareScheduler scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
@@ -454,7 +454,7 @@ public class TestConstraintSolverStrategy {
Map<String, SupervisorDetails> supMap = genSupervisors(37, 16, 400, 1024 * 4);
Cluster cluster = makeCluster(topologies, supMap);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
- rs.prepare(config);
+ rs.prepare(config, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
assertStatusSuccess(cluster, topo.getId());
@@ -476,7 +476,7 @@ public class TestConstraintSolverStrategy {
newAssignments.put(topo.getId(), new SchedulerAssignmentImpl(topo.getId(), newExecToSlot, null, null));
cluster.setAssignments(newAssignments, false);
- rs.prepare(config);
+ rs.prepare(config, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
assertStatusSuccess(cluster, topo.getId());
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 30f2bb7..a2f2437 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -161,7 +161,7 @@ public class TestDefaultResourceAwareStrategy {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
@@ -257,7 +257,7 @@ public class TestDefaultResourceAwareStrategy {
// schedule 1st topology
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, topo[0].getName());
@@ -324,7 +324,7 @@ public class TestDefaultResourceAwareStrategy {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
// [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
@@ -452,7 +452,7 @@ public class TestDefaultResourceAwareStrategy {
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 5c28a87..51252ad 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -118,7 +118,7 @@ public class TestGenericResourceAwareStrategy {
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
@@ -206,7 +206,7 @@ public class TestGenericResourceAwareStrategy {
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
//We need to have 3 slots on 3 separate hosts. The topology needs 6 GPUs 3500 MB memory and 350% CPU
@@ -285,7 +285,7 @@ public class TestGenericResourceAwareStrategy {
// should schedule gpu1 and noGpu successfully
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(conf);
+ scheduler.prepare(conf, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
assertTopologiesFullyScheduled(cluster, gpu1);
assertTopologiesFullyScheduled(cluster, noGpu);
@@ -313,7 +313,7 @@ public class TestGenericResourceAwareStrategy {
config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
scheduler = new ResourceAwareScheduler();
- scheduler.prepare(config);
+ scheduler.prepare(config, new StormMetricsRegistry());
TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
5, 100, 300, 0, 0, "user", 8192);