You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/12/18 17:12:57 UTC

[2/3] storm git commit: STORM-3295 Fix blacklisting of multiple supervisors per host using DefaultBlacklistStrategy

STORM-3295 Fix blacklisting of multiple supervisors per host using DefaultBlacklistStrategy


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f594c20d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f594c20d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f594c20d

Branch: refs/heads/master
Commit: f594c20dc5f3547578835e32c2a48cc37f2d6cfd
Parents: b4d3df2
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Thu Dec 6 11:02:10 2018 -0600
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Thu Dec 6 11:02:10 2018 -0600

----------------------------------------------------------------------
 .../strategies/DefaultBlacklistStrategy.java    | 44 ++++++++++++++------
 .../strategies/RasBlacklistStrategy.java        | 17 --------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f594c20d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index a0c5b6c..7748d6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -24,7 +24,6 @@ import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
 import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
 import org.apache.storm.utils.ObjectReader;
@@ -142,20 +141,23 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             if (shortageSlots > 0) {
                 LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", shortageSlots);
 
-                //release earliest blacklist
-                for (String supervisor : blacklistedNodeIds) {
-                    SupervisorDetails sd = availableSupervisors.get(supervisor);
-                    if (sd != null) {
-                        int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
-                        readyToRemove.add(supervisor);
-                        shortageSlots -= sdAvailableSlots;
-                        LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisor,
-                            sdAvailableSlots, shortageSlots);
-                        if (shortageSlots <= 0) {
-                            // we have enough resources now...
-                            break;
+                //release earliest blacklist - but release all supervisors on a given blacklisted host.
+                Map<String, Set<String>> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster);
+                for (Set<String> supervisorIds : hostToSupervisorIds.values()) {
+                    for (String supervisorId : supervisorIds) {
+                        SupervisorDetails sd = availableSupervisors.get(supervisorId);
+                        if (sd != null) {
+                            int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
+                            readyToRemove.add(supervisorId);
+                            shortageSlots -= sdAvailableSlots;
+                            LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisorId,
+                                    sdAvailableSlots, shortageSlots);
                         }
                     }
+                    if (shortageSlots <= 0) {
+                        // we have enough resources now...
+                        break;
+                    }
                 }
             }
         }
@@ -176,4 +178,20 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             throw new RuntimeException(e);
         }
     }
+
+    protected Map<String, Set<String>> createHostToSupervisorMap(final List<String> blacklistedNodeIds, Cluster cluster) {
+        Map<String, Set<String>> hostToSupervisorMap = new TreeMap<>();
+        for (String supervisorId : blacklistedNodeIds) {
+            String hostname = cluster.getHost(supervisorId);
+            if (hostname != null) {
+                Set<String> supervisorIds = hostToSupervisorMap.get(hostname);
+                if (supervisorIds == null) {
+                    supervisorIds = new HashSet<>();
+                    hostToSupervisorMap.put(hostname, supervisorIds);
+                }
+                supervisorIds.add(supervisorId);
+            }
+        }
+        return hostToSupervisorMap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f594c20d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
index ac7abb1..574fe91 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.scheduler.Cluster;
@@ -107,20 +106,4 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
         }
         return readyToRemove;
     }
-
-    private Map<String, Set<String>> createHostToSupervisorMap(final List<String> blacklistedNodeIds, Cluster cluster) {
-        Map<String, Set<String>> hostToSupervisorMap = new TreeMap<>();
-        for (String supervisorId : blacklistedNodeIds) {
-            String hostname = cluster.getHost(supervisorId);
-            if (hostname != null) {
-                Set<String> supervisorIds = hostToSupervisorMap.get(hostname);
-                if (supervisorIds == null) {
-                    supervisorIds = new HashSet<>();
-                    hostToSupervisorMap.put(hostname, supervisorIds);
-                }
-                supervisorIds.add(supervisorId);
-            }
-        }
-        return hostToSupervisorMap;
-    }
 }