You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/12/18 17:12:57 UTC
[2/3] storm git commit: STORM-3295 Fix blacklisting of multiple
supervisors per host using DefaultBlacklistStrategy
STORM-3295 Fix blacklisting of multiple supervisors per host using DefaultBlacklistStrategy
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f594c20d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f594c20d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f594c20d
Branch: refs/heads/master
Commit: f594c20dc5f3547578835e32c2a48cc37f2d6cfd
Parents: b4d3df2
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Thu Dec 6 11:02:10 2018 -0600
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Thu Dec 6 11:02:10 2018 -0600
----------------------------------------------------------------------
.../strategies/DefaultBlacklistStrategy.java | 44 ++++++++++++++------
.../strategies/RasBlacklistStrategy.java | 17 --------
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f594c20d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index a0c5b6c..7748d6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -24,7 +24,6 @@ import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
import org.apache.storm.utils.ObjectReader;
@@ -142,20 +141,23 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
if (shortageSlots > 0) {
LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", shortageSlots);
- //release earliest blacklist
- for (String supervisor : blacklistedNodeIds) {
- SupervisorDetails sd = availableSupervisors.get(supervisor);
- if (sd != null) {
- int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
- readyToRemove.add(supervisor);
- shortageSlots -= sdAvailableSlots;
- LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisor,
- sdAvailableSlots, shortageSlots);
- if (shortageSlots <= 0) {
- // we have enough resources now...
- break;
+ //release earliest blacklist - but release all supervisors on a given blacklisted host.
+ Map<String, Set<String>> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster);
+ for (Set<String> supervisorIds : hostToSupervisorIds.values()) {
+ for (String supervisorId : supervisorIds) {
+ SupervisorDetails sd = availableSupervisors.get(supervisorId);
+ if (sd != null) {
+ int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
+ readyToRemove.add(supervisorId);
+ shortageSlots -= sdAvailableSlots;
+ LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisorId,
+ sdAvailableSlots, shortageSlots);
}
}
+ if (shortageSlots <= 0) {
+ // we have enough resources now...
+ break;
+ }
}
}
}
@@ -176,4 +178,20 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
throw new RuntimeException(e);
}
}
+
+ protected Map<String, Set<String>> createHostToSupervisorMap(final List<String> blacklistedNodeIds, Cluster cluster) {
+ Map<String, Set<String>> hostToSupervisorMap = new TreeMap<>();
+ for (String supervisorId : blacklistedNodeIds) {
+ String hostname = cluster.getHost(supervisorId);
+ if (hostname != null) {
+ Set<String> supervisorIds = hostToSupervisorMap.get(hostname);
+ if (supervisorIds == null) {
+ supervisorIds = new HashSet<>();
+ hostToSupervisorMap.put(hostname, supervisorIds);
+ }
+ supervisorIds.add(supervisorId);
+ }
+ }
+ return hostToSupervisorMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f594c20d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
index ac7abb1..574fe91 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.scheduler.Cluster;
@@ -107,20 +106,4 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
}
return readyToRemove;
}
-
- private Map<String, Set<String>> createHostToSupervisorMap(final List<String> blacklistedNodeIds, Cluster cluster) {
- Map<String, Set<String>> hostToSupervisorMap = new TreeMap<>();
- for (String supervisorId : blacklistedNodeIds) {
- String hostname = cluster.getHost(supervisorId);
- if (hostname != null) {
- Set<String> supervisorIds = hostToSupervisorMap.get(hostname);
- if (supervisorIds == null) {
- supervisorIds = new HashSet<>();
- hostToSupervisorMap.put(hostname, supervisorIds);
- }
- supervisorIds.add(supervisorId);
- }
- }
- return hostToSupervisorMap;
- }
}