You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/15 18:45:55 UTC
[storm] 01/04: STORM-3344 add unit test to verify blacklist
scheduler handles supervisors with changing ports
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
commit cd1062969bace192f1ea6985cb0aea92bd5aae3f
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Feb 27 08:32:26 2019 -0600
STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports
---
.../scheduler/blacklist/BlacklistScheduler.java | 8 ++--
.../strategies/DefaultBlacklistStrategy.java | 10 ++---
.../scheduler/blacklist/FaultGenerateUtils.java | 2 +-
.../blacklist/TestBlacklistScheduler.java | 46 ++++++++++++++++++++--
.../blacklist/TestUtilsForBlacklistScheduler.java | 8 +++-
.../resource/TestResourceAwareScheduler.java | 2 +-
6 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 73bc5a0..1061a3f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -193,9 +193,11 @@ public class BlacklistScheduler implements IScheduler {
for (String supervisor : supervisors) {
int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
Set<Integer> slots = item.get(supervisor);
- if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad
+ if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all of its slots matched the cached supervisor
+ // track how many times a cached supervisor has been marked bad
supervisorCountMap.put(supervisor, supervisorCount + 1);
}
+ // track how many times each supervisor slot has been listed as bad
for (Integer slot : slots) {
WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
@@ -217,8 +219,8 @@ public class BlacklistScheduler implements IScheduler {
WorkerSlot workerSlot = entry.getKey();
String supervisorKey = workerSlot.getNodeId();
Integer slot = workerSlot.getPort();
- int value = entry.getValue();
- if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
+ int slotFailures = entry.getValue();
+ if (slotFailures == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
Set<Integer> slots = cachedSupervisors.get(supervisorKey);
if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors
slots.remove(slot);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 7748d6f..8a36669 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -98,12 +98,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
public void resumeFromBlacklist() {
Set<String> readyToRemove = new HashSet<>();
for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
- String key = entry.getKey();
- int value = entry.getValue() - 1;
- if (value == 0) {
- readyToRemove.add(key);
+ String supervisor = entry.getKey();
+ int countUntilResume = entry.getValue() - 1;
+ if (countUntilResume == 0) {
+ readyToRemove.add(supervisor);
} else {
- blacklist.put(key, value);
+ blacklist.put(supervisor, countUntilResume);
}
}
for (String key : readyToRemove) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index 9a34080..c962039 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -43,7 +43,7 @@ public class FaultGenerateUtils {
supervisors = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-" + supervisor);
} else {
for (int slot : slots) {
- supervisors = TestUtilsForBlacklistScheduler.removePortFromSupervisors(supervisors, "sup-" + supervisor, slot);
+ supervisors = TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supervisors, "sup-" + supervisor, slot, false);
}
}
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 76ad01f..f55cbb1 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -120,9 +120,9 @@ public class TestBlacklistScheduler {
scheduler.prepare(config);
scheduler.schedule(topologies, cluster);
- cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
- cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
scheduler.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
scheduler.schedule(topologies, cluster);
@@ -329,11 +329,51 @@ public class TestBlacklistScheduler {
Assert.assertEquals(cached, bs.cachedSupervisors.keySet());
cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
bs.schedule(topologies,cluster);
- cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,"sup-0",0, false),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
for (int i = 0 ;i < 20 ; i++){
bs.schedule(topologies, cluster);
}
Set<Integer> cachedPorts = Sets.newHashSet(1, 2, 3);
Assert.assertEquals(cachedPorts, bs.cachedSupervisors.get("sup-0"));
}
+
+ @Test
+ public void blacklistSupervisorWithAddedPort() {
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME,10);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT,2);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
+
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
+ scheduler.prepare(config);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5,
+ 15, 1, 1, currentTime - 2,true);
+ topoMap.put(topo1.getId(), topo1);
+ Topologies topologies = new Topologies(topoMap);
+
+ INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+ ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+ Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(),
+ topologies, config);
+
+ // allow blacklist scheduler to cache the supervisor
+ scheduler.schedule(topologies, cluster);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,
+ "sup-0",4, true),TestUtilsForBlacklistScheduler.assignmentMapToImpl(
+ cluster.getAssignments()), topologies, config);
+ // allow blacklist scheduler to cache the supervisor with an added port
+ scheduler.schedule(topologies, cluster);
+ // remove the port from the supervisor and make sure the blacklist scheduler can remove the port without
+ // throwing an exception
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,
+ "sup-0",4, false),TestUtilsForBlacklistScheduler.assignmentMapToImpl(
+ cluster.getAssignments()), topologies, config);
+ scheduler.schedule(topologies, cluster);
+ }
+
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
index 553032d..ded11d2 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -67,7 +67,7 @@ public class TestUtilsForBlacklistScheduler {
return retList;
}
- public static Map<String, SupervisorDetails> removePortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor, int port) {
+ public static Map<String, SupervisorDetails> modifyPortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor, int port, boolean add) {
Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
for (Map.Entry<String, SupervisorDetails> supervisorDetailsEntry : supervisorDetailsMap.entrySet()) {
String supervisorKey = supervisorDetailsEntry.getKey();
@@ -75,7 +75,11 @@ public class TestUtilsForBlacklistScheduler {
Set<Integer> ports = new HashSet<>();
ports.addAll(supervisorDetails.getAllPorts());
if (supervisorKey.equals(supervisor)) {
- ports.remove(port);
+ if (add) {
+ ports.add(port);
+ } else {
+ ports.remove(port);
+ }
}
SupervisorDetails sup = new SupervisorDetails(supervisorDetails.getId(), supervisorDetails.getHost(), null, (HashSet) ports, null);
retList.put(sup.getId(), sup);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 1791a11..592a3b6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -716,7 +716,7 @@ public class TestResourceAwareScheduler {
// wordSpout2 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
// The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
// Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this
- // topology will be scheduleded
+ // topology will be scheduled
TopologyBuilder builder2 = new TopologyBuilder();
builder2.setSpout("wordSpout2", new TestWordSpout(), 5);
StormTopology stormTopology2 = builder2.createTopology();