You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/01/06 22:48:59 UTC
[storm] branch master updated: [STORM-3507] add greylist for
superviosrs which are forced out of blacklist due to low resources
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8277bb6 [STORM-3507] add greylist for superviosrs which are forced out of blacklist due to low resources
new 1aec14d Merge pull request #3127 from RuiLi8080/STORM-3507
8277bb6 is described below
commit 8277bb6ecd27499b9cff39116c2c964594bc173c
Author: Rui Li <ru...@gmail.com>
AuthorDate: Tue Sep 10 09:25:33 2019 -0500
[STORM-3507] add greylist for superviosrs which are forced out of blacklist due to low resources
---
.../java/org/apache/storm/scheduler/Cluster.java | 20 ++++++++-
.../scheduler/blacklist/BlacklistScheduler.java | 1 +
.../strategies/DefaultBlacklistStrategy.java | 5 ++-
.../scheduling/BaseResourceAwareStrategy.java | 12 ++++-
.../blacklist/TestBlacklistScheduler.java | 51 ++++++++++++++++++++++
5 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 04f0960..e4a9110 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -84,6 +83,7 @@ public class Cluster implements ISchedulingState {
private final ResourceMetrics resourceMetrics;
private SchedulerAssignmentImpl assignment;
private Set<String> blackListedHosts = new HashSet<>();
+ private List<String> greyListedSupervisors = new ArrayList<>();
private INimbus inimbus;
private double minWorkerCpu = 0.0;
@@ -102,7 +102,7 @@ public class Cluster implements ISchedulingState {
Map<String, ? extends SchedulerAssignment> map,
Topologies topologies,
Map<String, Object> conf) {
- this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null);
+ this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null);
}
/**
@@ -118,6 +118,7 @@ public class Cluster implements ISchedulingState {
new HashMap<>(src.conf),
src.status,
src.blackListedHosts,
+ src.greyListedSupervisors,
src.networkTopography);
}
@@ -138,6 +139,7 @@ public class Cluster implements ISchedulingState {
new HashMap<>(src.conf),
src.status,
src.blackListedHosts,
+ src.greyListedSupervisors,
src.networkTopography);
}
@@ -150,6 +152,7 @@ public class Cluster implements ISchedulingState {
Map<String, Object> conf,
Map<String, String> status,
Set<String> blackListedHosts,
+ List<String> greyListedSupervisors,
Map<String, List<String>> networkTopography) {
this.inimbus = nimbus;
this.resourceMetrics = resourceMetrics;
@@ -201,6 +204,10 @@ public class Cluster implements ISchedulingState {
if (blackListedHosts != null) {
this.blackListedHosts.addAll(blackListedHosts);
}
+
+ if (greyListedSupervisors != null) {
+ this.greyListedSupervisors.addAll(greyListedSupervisors);
+ }
setAssignments(assignments, true);
}
@@ -1085,4 +1092,13 @@ public class Cluster implements ISchedulingState {
public double getMinWorkerCpu() {
return minWorkerCpu;
}
+
+ public List<String> getGreyListedSupervisors() {
+ return greyListedSupervisors;
+ }
+
+ public void setGreyListedSupervisors(Set<String> greyListedSupervisors) {
+ this.greyListedSupervisors.clear();
+ this.greyListedSupervisors.addAll(greyListedSupervisors);
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 0527335..179f3d0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -115,6 +115,7 @@ public class BlacklistScheduler implements IScheduler {
Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
blacklistStrategy.resumeFromBlacklist();
badSupervisors(supervisors);
+ // this step also frees up some bad supervisors to greylist due to resource shortage
blacklistedSupervisorIds = refreshBlacklistedSupervisorIds(cluster, topologies);
Set<String> blacklistHosts = getBlacklistHosts(cluster, blacklistedSupervisorIds);
cluster.setBlacklistedHosts(blacklistHosts);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 8a36669..7115503 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -85,9 +85,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
}
}
Set<String> toRelease = releaseBlacklistWhenNeeded(cluster, new ArrayList<>(blacklist.keySet()));
+ // After having computed the final blacklist,
+ // the nodes which are released due to resource shortage will be put to the "greylist".
if (toRelease != null) {
LOG.debug("Releasing {} nodes because of low resources", toRelease.size());
- for (String key: toRelease) {
+ cluster.setGreyListedSupervisors(toRelease);
+ for (String key : toRelease) {
blacklist.remove(key);
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..9cfaabc 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -31,6 +31,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.storm.generated.ComponentType;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.Cluster;
@@ -242,7 +245,9 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
this.parent = parent;
rackIterator = sortedRacks.iterator();
pre = parent.favoredNodeIds.iterator();
- post = parent.unFavoredNodeIds.iterator();
+ post = Stream.concat(parent.unFavoredNodeIds.stream(), parent.greyListedSupervisorIds.stream())
+ .collect(Collectors.toList())
+ .iterator();
skip = parent.skippedNodeIds;
}
@@ -305,15 +310,20 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
private final TopologyDetails td;
private final List<String> favoredNodeIds;
private final List<String> unFavoredNodeIds;
+ private final List<String> greyListedSupervisorIds;
private final Set<String> skippedNodeIds = new HashSet<>();
LazyNodeSorting(TopologyDetails td, ExecutorDetails exec,
List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
this.favoredNodeIds = favoredNodeIds;
this.unFavoredNodeIds = unFavoredNodeIds;
+ this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
this.unFavoredNodeIds.removeAll(favoredNodeIds);
+ this.favoredNodeIds.removeAll(greyListedSupervisorIds);
+ this.unFavoredNodeIds.removeAll(greyListedSupervisorIds);
skippedNodeIds.addAll(favoredNodeIds);
skippedNodeIds.addAll(unFavoredNodeIds);
+ skippedNodeIds.addAll(greyListedSupervisorIds);
this.td = td;
this.exec = exec;
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 8ac0e13..aad9d98 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -30,6 +30,8 @@ import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.utils.Utils;
import org.junit.After;
import org.junit.Assert;
@@ -223,6 +225,55 @@ public class TestBlacklistScheduler {
}
@Test
+ public void TestGreylist() {
+ INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 3);
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 0.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0);
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+ TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 1, 1, 1, 1, currentTime - 2, true);
+ TopologyDetails topo2 = TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 1, 1, 1, 1, currentTime - 8, true);
+ Topologies topologies = new Topologies(topoMap);
+
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+ Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ scheduler = new BlacklistScheduler(new ResourceAwareScheduler(), metricsRegistry);
+
+ scheduler.prepare(config);
+ scheduler.schedule(topologies, cluster);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ scheduler.schedule(topologies, cluster);
+ cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ scheduler.schedule(topologies, cluster);
+ cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ scheduler.schedule(topologies, cluster);
+ Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
+
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topologies = new Topologies(topoMap);
+ cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ scheduler.schedule(topologies, cluster);
+ Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts());
+ Assert.assertEquals("greylist", Collections.singletonList("sup-0"), cluster.getGreyListedSupervisors());
+ LOG.debug("Now only these slots remain available: {}", cluster.getAvailableSlots());
+ Assert.assertTrue(cluster.getAvailableSlots(supMap.get("sup-0")).containsAll(cluster.getAvailableSlots()));
+ }
+
+ @Test
public void TestList() {
INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
Config config = new Config();