You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/01/06 22:48:59 UTC

[storm] branch master updated: [STORM-3507] add greylist for superviosrs which are forced out of blacklist due to low resources

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8277bb6  [STORM-3507] add greylist for superviosrs which are forced out of blacklist due to low resources
     new 1aec14d  Merge pull request #3127 from RuiLi8080/STORM-3507
8277bb6 is described below

commit 8277bb6ecd27499b9cff39116c2c964594bc173c
Author: Rui Li <ru...@gmail.com>
AuthorDate: Tue Sep 10 09:25:33 2019 -0500

    [STORM-3507] add greylist for superviosrs which are forced out of blacklist due to low resources
---
 .../java/org/apache/storm/scheduler/Cluster.java   | 20 ++++++++-
 .../scheduler/blacklist/BlacklistScheduler.java    |  1 +
 .../strategies/DefaultBlacklistStrategy.java       |  5 ++-
 .../scheduling/BaseResourceAwareStrategy.java      | 12 ++++-
 .../blacklist/TestBlacklistScheduler.java          | 51 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 04f0960..e4a9110 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -84,6 +83,7 @@ public class Cluster implements ISchedulingState {
     private final ResourceMetrics resourceMetrics;
     private SchedulerAssignmentImpl assignment;
     private Set<String> blackListedHosts = new HashSet<>();
+    private List<String> greyListedSupervisors = new ArrayList<>();
     private INimbus inimbus;
     private double minWorkerCpu = 0.0;
 
@@ -102,7 +102,7 @@ public class Cluster implements ISchedulingState {
         Map<String, ? extends SchedulerAssignment> map,
         Topologies topologies,
         Map<String, Object> conf) {
-        this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null);
+        this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null, null);
     }
 
     /**
@@ -118,6 +118,7 @@ public class Cluster implements ISchedulingState {
             new HashMap<>(src.conf),
             src.status,
             src.blackListedHosts,
+            src.greyListedSupervisors,
             src.networkTopography);
     }
 
@@ -138,6 +139,7 @@ public class Cluster implements ISchedulingState {
             new HashMap<>(src.conf),
             src.status,
             src.blackListedHosts,
+            src.greyListedSupervisors,
             src.networkTopography);
     }
 
@@ -150,6 +152,7 @@ public class Cluster implements ISchedulingState {
         Map<String, Object> conf,
         Map<String, String> status,
         Set<String> blackListedHosts,
+        List<String> greyListedSupervisors,
         Map<String, List<String>> networkTopography) {
         this.inimbus = nimbus;
         this.resourceMetrics = resourceMetrics;
@@ -201,6 +204,10 @@ public class Cluster implements ISchedulingState {
         if (blackListedHosts != null) {
             this.blackListedHosts.addAll(blackListedHosts);
         }
+
+        if (greyListedSupervisors != null) {
+            this.greyListedSupervisors.addAll(greyListedSupervisors);
+        }
         setAssignments(assignments, true);
     }
 
@@ -1085,4 +1092,13 @@ public class Cluster implements ISchedulingState {
     public double getMinWorkerCpu() {
         return minWorkerCpu;
     }
+
+    public List<String> getGreyListedSupervisors() {
+        return greyListedSupervisors;
+    }
+
+    public void setGreyListedSupervisors(Set<String> greyListedSupervisors) {
+        this.greyListedSupervisors.clear();
+        this.greyListedSupervisors.addAll(greyListedSupervisors);
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 0527335..179f3d0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -115,6 +115,7 @@ public class BlacklistScheduler implements IScheduler {
         Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         blacklistStrategy.resumeFromBlacklist();
         badSupervisors(supervisors);
+        // this step also frees up some bad supervisors to greylist due to resource shortage
         blacklistedSupervisorIds = refreshBlacklistedSupervisorIds(cluster, topologies);
         Set<String> blacklistHosts = getBlacklistHosts(cluster, blacklistedSupervisorIds);
         cluster.setBlacklistedHosts(blacklistHosts);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 8a36669..7115503 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -85,9 +85,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             }
         }
         Set<String> toRelease = releaseBlacklistWhenNeeded(cluster, new ArrayList<>(blacklist.keySet()));
+        // After having computed the final blacklist,
+        // the nodes which are released due to resource shortage will be put to the "greylist".
         if (toRelease != null) {
             LOG.debug("Releasing {} nodes because of low resources", toRelease.size());
-            for (String key: toRelease) {
+            cluster.setGreyListedSupervisors(toRelease);
+            for (String key : toRelease) {
                 blacklist.remove(key);
             }
         }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..9cfaabc 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -31,6 +31,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.storm.generated.ComponentType;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.Cluster;
@@ -242,7 +245,9 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
             this.parent = parent;
             rackIterator = sortedRacks.iterator();
             pre = parent.favoredNodeIds.iterator();
-            post = parent.unFavoredNodeIds.iterator();
+            post = Stream.concat(parent.unFavoredNodeIds.stream(), parent.greyListedSupervisorIds.stream())
+                            .collect(Collectors.toList())
+                            .iterator();
             skip = parent.skippedNodeIds;
         }
 
@@ -305,15 +310,20 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
         private final TopologyDetails td;
         private final List<String> favoredNodeIds;
         private final List<String> unFavoredNodeIds;
+        private final List<String> greyListedSupervisorIds;
         private final Set<String> skippedNodeIds = new HashSet<>();
 
         LazyNodeSorting(TopologyDetails td, ExecutorDetails exec,
                                List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
             this.favoredNodeIds = favoredNodeIds;
             this.unFavoredNodeIds = unFavoredNodeIds;
+            this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
             this.unFavoredNodeIds.removeAll(favoredNodeIds);
+            this.favoredNodeIds.removeAll(greyListedSupervisorIds);
+            this.unFavoredNodeIds.removeAll(greyListedSupervisorIds);
             skippedNodeIds.addAll(favoredNodeIds);
             skippedNodeIds.addAll(unFavoredNodeIds);
+            skippedNodeIds.addAll(greyListedSupervisorIds);
 
             this.td = td;
             this.exec = exec;
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 8ac0e13..aad9d98 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -30,6 +30,8 @@ import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import org.apache.storm.utils.Utils;
 import org.junit.After;
 import org.junit.Assert;
@@ -223,6 +225,55 @@ public class TestBlacklistScheduler {
     }
 
     @Test
+    public void TestGreylist() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 3);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 0.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0);
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 1, 1, 1, 1, currentTime - 2, true);
+        TopologyDetails topo2 = TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 1, 1, 1, 1, currentTime - 8, true);
+        Topologies topologies = new Topologies(topoMap);
+
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+        Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        scheduler = new BlacklistScheduler(new ResourceAwareScheduler(), metricsRegistry);
+
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        scheduler.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        scheduler.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        scheduler.schedule(topologies, cluster);
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
+
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topologies = new Topologies(topoMap);
+        cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        scheduler.schedule(topologies, cluster);
+        Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts());
+        Assert.assertEquals("greylist", Collections.singletonList("sup-0"), cluster.getGreyListedSupervisors());
+        LOG.debug("Now only these slots remain available: {}", cluster.getAvailableSlots());
+        Assert.assertTrue(cluster.getAvailableSlots(supMap.get("sup-0")).containsAll(cluster.getAvailableSlots()));
+    }
+
+    @Test
     public void TestList() {
         INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
         Config config = new Config();