You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/08/22 16:28:57 UTC

[storm] branch master updated: [STORM-3492] Add config to prevent good supervisor with bad workers from going to blacklist when necessary

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d11da  [STORM-3492] Add config to prevent good supervisor with bad workers from going to blacklist when necessary
     new 7ea9235  Merge pull request #3108 from RuiLi8080/STORM-3492
46d11da is described below

commit 46d11daebba8bb52edfd8628037a0f4ec7bb86ca
Author: ruili8080 <ru...@verizonmedia.com>
AuthorDate: Tue Aug 20 14:00:22 2019 -0500

    [STORM-3492] Add config to prevent good supervisor with bad workers from going to blacklist when necessary
---
 conf/defaults.yaml                                        |  1 +
 .../src/main/java/org/apache/storm/DaemonConfig.java      | 11 +++++++++++
 .../storm/scheduler/blacklist/BlacklistScheduler.java     | 15 +++++++++------
 3 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e3c33bd..1a17e52 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -343,6 +343,7 @@ blacklist.scheduler.tolerance.count: 3
 blacklist.scheduler.resume.time.secs: 1800
 blacklist.scheduler.reporter: "org.apache.storm.scheduler.blacklist.reporters.LogReporter"
 blacklist.scheduler.strategy: "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy"
+blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot: true
 
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
 
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 26cacff..820d80f 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -138,6 +138,17 @@ public class DaemonConfig implements Validated {
     public static final String BLACKLIST_SCHEDULER_STRATEGY = "blacklist.scheduler.strategy";
 
     /**
+     * Whether {@link org.apache.storm.scheduler.blacklist.BlacklistScheduler} will assume the supervisor is bad
+     * based on bad slots or not.
+     * A bad slot indicates the situation where the nimbus doesn't receive heartbeat from the worker in time,
+     * it's hard to differentiate if it's because of the supervisor node or the worker itself.
+     * If this is set to true, the scheduler will consider a supervisor is bad when seeing bad slots in it.
+     * Otherwise, the scheduler will assume a supervisor is bad only when it does not receive supervisor heartbeat in time.
+     */
+    @IsBoolean
+    public static final String BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT = "blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot";
+
+    /**
      * Whether we want to display all the resource capacity and scheduled usage on the UI page. You MUST have this variable set if you are
      * using any kind of resource-related scheduler.
      * <p/>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index a2bedcb..33c0e04 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -55,6 +55,7 @@ public class BlacklistScheduler implements IScheduler {
     protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
     protected int windowSize;
     protected volatile Set<String> blacklistedSupervisorIds;     // supervisor ids
+    private boolean blacklistOnBadSlots;
     private Map<String, Object> conf;
 
     public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
@@ -90,6 +91,7 @@ public class BlacklistScheduler implements IScheduler {
         badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
         cachedSupervisors = new HashMap<>();
         blacklistedSupervisorIds = new HashSet<>();
+        blacklistOnBadSlots = ObjectReader.getBoolean(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT), true);
 
         //nimbus:num-blacklisted-supervisor + non-blacklisted supervisor = nimbus:num-supervisors
         metricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistedSupervisorIds.size());
@@ -137,15 +139,16 @@ public class BlacklistScheduler implements IScheduler {
             String key = entry.getKey();
             SupervisorDetails supervisorDetails = entry.getValue();
             if (cachedSupervisors.containsKey(key)) {
-                Set<Integer> badSlots = badSlots(supervisorDetails, key);
-                if (badSlots.size() > 0) { //supervisor contains bad slots
-                    badSupervisors.put(key, badSlots);
+                if (blacklistOnBadSlots) {
+                    Set<Integer> badSlots = badSlots(supervisorDetails, key);
+                    if (badSlots.size() > 0) { //supervisor contains bad slots
+                        badSupervisors.put(key, badSlots);
+                    }
                 }
             } else {
                 cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache
             }
         }
-
         badSupervisorsToleranceSlidingWindow.add(badSupervisors);
     }
 
@@ -160,7 +163,6 @@ public class BlacklistScheduler implements IScheduler {
             allPorts.addAll(cachedSupervisorPorts);
             cachedSupervisors.put(supervisorKey, allPorts);
         }
-
         Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts);
         return badSlots;
     }
@@ -198,7 +200,8 @@ public class BlacklistScheduler implements IScheduler {
             for (String supervisor : supervisors) {
                 int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
                 Set<Integer> slots = item.get(supervisor);
-                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all of its slots matched the cached supervisor
+                // treat supervisor as bad only if all of its slots matched the cached supervisor
+                if (slots.equals(cachedSupervisors.get(supervisor))) {
                     // track how many times a cached supervisor has been marked bad
                     supervisorCountMap.put(supervisor, supervisorCount + 1);
                 }