You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/15 18:45:55 UTC

[storm] 01/04: STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit cd1062969bace192f1ea6985cb0aea92bd5aae3f
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Wed Feb 27 08:32:26 2019 -0600

    STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports
---
 .../scheduler/blacklist/BlacklistScheduler.java    |  8 ++--
 .../strategies/DefaultBlacklistStrategy.java       | 10 ++---
 .../scheduler/blacklist/FaultGenerateUtils.java    |  2 +-
 .../blacklist/TestBlacklistScheduler.java          | 46 ++++++++++++++++++++--
 .../blacklist/TestUtilsForBlacklistScheduler.java  |  8 +++-
 .../resource/TestResourceAwareScheduler.java       |  2 +-
 6 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 73bc5a0..1061a3f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -193,9 +193,11 @@ public class BlacklistScheduler implements IScheduler {
             for (String supervisor : supervisors) {
                 int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
                 Set<Integer> slots = item.get(supervisor);
-                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad
+                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all of its slots matched the cached supervisor
+                    // track how many times a cached supervisor has been marked bad
                     supervisorCountMap.put(supervisor, supervisorCount + 1);
                 }
+                // track how many times each supervisor slot has been listed as bad
                 for (Integer slot : slots) {
                     WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
                     int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
@@ -217,8 +219,8 @@ public class BlacklistScheduler implements IScheduler {
             WorkerSlot workerSlot = entry.getKey();
             String supervisorKey = workerSlot.getNodeId();
             Integer slot = workerSlot.getPort();
-            int value = entry.getValue();
-            if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
+            int slotFailures = entry.getValue();
+            if (slotFailures == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
                 Set<Integer> slots = cachedSupervisors.get(supervisorKey);
                 if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors
                     slots.remove(slot);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 7748d6f..8a36669 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -98,12 +98,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
     public void resumeFromBlacklist() {
         Set<String> readyToRemove = new HashSet<>();
         for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
-            String key = entry.getKey();
-            int value = entry.getValue() - 1;
-            if (value == 0) {
-                readyToRemove.add(key);
+            String supervisor = entry.getKey();
+            int countUntilResume = entry.getValue() - 1;
+            if (countUntilResume == 0) {
+                readyToRemove.add(supervisor);
             } else {
-                blacklist.put(key, value);
+                blacklist.put(supervisor, countUntilResume);
             }
         }
         for (String key : readyToRemove) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index 9a34080..c962039 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -43,7 +43,7 @@ public class FaultGenerateUtils {
                     supervisors = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-" + supervisor);
                 } else {
                     for (int slot : slots) {
-                        supervisors = TestUtilsForBlacklistScheduler.removePortFromSupervisors(supervisors, "sup-" + supervisor, slot);
+                        supervisors = TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supervisors, "sup-" + supervisor, slot, false);
                     }
                 }
             }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 76ad01f..f55cbb1 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -120,9 +120,9 @@ public class TestBlacklistScheduler {
         scheduler.prepare(config);
         scheduler.schedule(topologies, cluster);
 
-        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
-        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         scheduler.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         scheduler.schedule(topologies, cluster);
@@ -329,11 +329,51 @@ public class TestBlacklistScheduler {
         Assert.assertEquals(cached, bs.cachedSupervisors.keySet());
         cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies,cluster);
-        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,"sup-0",0, false),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         for (int i = 0 ;i < 20 ; i++){
             bs.schedule(topologies, cluster);
         }
         Set<Integer> cachedPorts = Sets.newHashSet(1, 2, 3);
         Assert.assertEquals(cachedPorts, bs.cachedSupervisors.get("sup-0"));
     }
+
+    @Test
+    public void blacklistSupervisorWithAddedPort() {
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME,10);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT,2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
+
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
+        scheduler.prepare(config);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5,
+                15, 1, 1, currentTime - 2,true);
+        topoMap.put(topo1.getId(), topo1);
+        Topologies topologies = new Topologies(topoMap);
+
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+        ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+        Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(),
+                topologies, config);
+
+        // allow blacklist scheduler to cache the supervisor
+        scheduler.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,
+                "sup-0",4, true),TestUtilsForBlacklistScheduler.assignmentMapToImpl(
+                        cluster.getAssignments()), topologies, config);
+        // allow blacklist scheduler to cache the supervisor with an added port
+        scheduler.schedule(topologies, cluster);
+        // remove the port from the supervisor and make sure the blacklist scheduler can remove the port without
+        // throwing an exception
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap,
+                "sup-0",4, false),TestUtilsForBlacklistScheduler.assignmentMapToImpl(
+                        cluster.getAssignments()), topologies, config);
+        scheduler.schedule(topologies, cluster);
+    }
+
 }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
index 553032d..ded11d2 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -67,7 +67,7 @@ public class TestUtilsForBlacklistScheduler {
         return retList;
     }
 
-    public static Map<String, SupervisorDetails> removePortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor, int port) {
+    public static Map<String, SupervisorDetails> modifyPortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor, int port, boolean add) {
         Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
         for (Map.Entry<String, SupervisorDetails> supervisorDetailsEntry : supervisorDetailsMap.entrySet()) {
             String supervisorKey = supervisorDetailsEntry.getKey();
@@ -75,7 +75,11 @@ public class TestUtilsForBlacklistScheduler {
             Set<Integer> ports = new HashSet<>();
             ports.addAll(supervisorDetails.getAllPorts());
             if (supervisorKey.equals(supervisor)) {
-                ports.remove(port);
+                if (add) {
+                    ports.add(port);
+                } else {
+                    ports.remove(port);
+                }
             }
             SupervisorDetails sup = new SupervisorDetails(supervisorDetails.getId(), supervisorDetails.getHost(), null, (HashSet) ports, null);
             retList.put(sup.getId(), sup);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 1791a11..592a3b6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -716,7 +716,7 @@ public class TestResourceAwareScheduler {
         // wordSpout2 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
         // The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
         // Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this
-        // topology will be scheduleded
+        // topology will be scheduled
         TopologyBuilder builder2 = new TopologyBuilder();
         builder2.setSpout("wordSpout2", new TestWordSpout(), 5);
         StormTopology stormTopology2 = builder2.createTopology();