You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/29 14:08:45 UTC

[1/4] storm git commit: STORM-2083 Blacklist scheduler

Repository: storm
Updated Branches:
  refs/heads/master 293643f09 -> 177cb95f4


STORM-2083 Blacklist scheduler

Resolved conflict via Jungtaek Lim <ka...@gmail.com>

This commit squashes the commits into one and commit messages were below:

1. add apache header
2. rename method
3. move config define to Config
4. code style fix
5. change debug log level to debug

remove all blacklist enable config

remove unused default value

1.storm blacklist code style, header and other bugs
2.wrap blacklist scheduler in nimbus and rebase to master

change blacklist-scheduler schedule method log level.

rename some variables and refactor badSlots args

add blacklist.scheduler to default.yaml

1. removeLongTimeDisappearFromCache bug fix
2. add unit test for removeLongTimeDisappearFromCache
3. change blacklistScheduler fields to protected so it can be visited from sub-class and unit tests

1. remove CircularBuffer and replace it with guava EvictingQueue.
2. modify nimbus_test.clj to adapt blacklistScheduler
3. comments, Utils.getInt, DefaultBlacklistStrategy.prepare with conf


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25c27c6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25c27c6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25c27c6d

Branch: refs/heads/master
Commit: 25c27c6d3a3680c6c3cf4d4e0a0dc0a1c18b82e8
Parents: 9d5515a
Author: howard.li <ho...@vipshop.com>
Authored: Thu Jul 7 18:32:30 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 27 14:09:27 2017 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   6 +
 .../src/jvm/org/apache/storm/Config.java        |   6 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  18 +-
 storm-server/pom.xml                            |   2 +-
 .../java/org/apache/storm/DaemonConfig.java     |  39 +++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   8 +-
 .../scheduler/blacklist/BlacklistScheduler.java | 257 +++++++++++++++
 .../apache/storm/scheduler/blacklist/Sets.java  |  58 ++++
 .../blacklist/reporters/IReporter.java          |  31 ++
 .../blacklist/reporters/LogReporter.java        |  40 +++
 .../strategies/DefaultBlacklistStrategy.java    | 165 ++++++++++
 .../strategies/IBlacklistStrategy.java          |  53 +++
 .../scheduler/blacklist/FaultGenerateUtils.java |  74 +++++
 .../blacklist/TestBlacklistScheduler.java       | 327 +++++++++++++++++++
 .../TestUtilsForBlacklistScheduler.java         | 263 +++++++++++++++
 15 files changed, 1333 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 16df356..1478490 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -274,6 +274,12 @@ topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.sch
 resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
 resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
 
+blacklist.scheduler.tolerance.time.secs: 300
+blacklist.scheduler.tolerance.count: 3
+blacklist.scheduler.resume.time.secs: 1800
+blacklist.scheduler.reporter: "org.apache.storm.scheduler.blacklist.reporters.LogReporter"
+blacklist.scheduler.strategy: "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy"
+
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
 
 pacemaker.servers: []

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7149631..e296e8f 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1515,8 +1515,8 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port";
 
     /*
- * How often to poll Exhibitor cluster in millis.
- */
+     * How often to poll Exhibitor cluster in millis.
+     */
     @isString
     public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath";
 
@@ -1532,7 +1532,7 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times";
 
-    /**
+    /*
      * The interval between retries of an Exhibitor operation.
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index dcea44e..38b5da0 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -55,8 +55,12 @@
 (def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
 
 (defn- mk-nimbus
-  [conf inimbus blob-store leader-elector group-mapper cluster-state]
-  (Nimbus. conf inimbus cluster-state nil blob-store nil leader-elector group-mapper))
+  ([conf inimbus]
+   (mk-nimbus conf inimbus nil nil nil nil))
+  ([conf inimbus blob-store leader-elector group-mapper cluster-state]
+    ;blacklist scheduler requires nimbus-monitor-freq-secs as input parameter.
+   (let [conf-with-nimbus-monitor-freq (merge {NIMBUS-MONITOR-FREQ-SECS 10} conf)]
+     (Nimbus. conf-with-nimbus-monitor-freq inimbus cluster-state nil blob-store nil leader-elector group-mapper))))
 
 (defn- from-json
        [^String str]
@@ -1635,7 +1639,7 @@
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
                           (zkLeaderElectorImpl [conf blob-store tc] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
-          (Nimbus. auth-conf fake-inimbus)
+          (mk-nimbus auth-conf fake-inimbus)
           (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
           ))))
 
@@ -1895,7 +1899,7 @@
         hb-cache (into {}(map vector inactive-topos '(nil nil)))
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        conf {}]
+        conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
@@ -1940,7 +1944,7 @@
         hb-cache {"topo1" nil "topo2" nil}
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        conf {}]
+        conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
@@ -1975,7 +1979,7 @@
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         mock-tc (Mockito/mock TopoCache)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
+        nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
     (let [supervisor1-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
           user1-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor1-topologies))
           supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2"))
@@ -1996,7 +2000,7 @@
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         mock-tc (Mockito/mock TopoCache)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
+        nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
     (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
     (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
     (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index af95261..b57151c 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -136,7 +136,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>3626</maxAllowedViolations>
+                    <maxAllowedViolations>4000</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 75a9cad..c5ee27a 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,6 +18,13 @@
 
 package org.apache.storm;
 
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
+import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
@@ -103,6 +110,38 @@ public class DaemonConfig implements Validated {
     public static final String STORM_SCHEDULER = "storm.scheduler";
 
     /**
+     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
+
+    /**
+     * The number of hit count that will trigger blacklist in tolerance time
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count";
+
+    /**
+     * The number of seconds that the blacklisted slots or supervisor will be resumed
+     */
+    @isInteger
+    public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
+
+    /**
+     * The class that the blacklist scheduler will report the blacklist
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IReporter.class)
+    public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter";
+
+    /**
+     * The class that specifies the eviction strategy to use in blacklist scheduler
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
+    public static final String BLACKLIST_SCHEDULER_STRATEGY = "blacklist.scheduler.strategy";
+
+    /**
      * Whether we want to display all the resource capacity and scheduled usage on the UI page.
      * You MUST have this variable set if you are using any kind of resource-related scheduler.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index d6b6ab6..7331906 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -147,6 +147,7 @@ import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.Cluster.SupervisorResources;
+import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
@@ -472,8 +473,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             LOG.info("Using default scheduler");
             scheduler = new DefaultScheduler();
         }
-        scheduler.prepare(conf);
-        return scheduler;
+        BlacklistScheduler blacklistWrappedScheduler = new BlacklistScheduler(scheduler);
+        blacklistWrappedScheduler.prepare(conf);
+        return blacklistWrappedScheduler;
     }
 
     /**
@@ -3867,7 +3869,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public NimbusSummary getLeader() throws AuthorizationException, TException {
         getLeaderCalls.mark();

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
new file mode 100644
index 0000000..0080134
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+    private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
+    private final IScheduler underlyingScheduler;
+    @SuppressWarnings("rawtypes")
+    private Map _conf;
+
+    protected int toleranceTime;
+    protected int toleranceCount;
+    protected int resumeTime;
+    protected IReporter reporter;
+    protected IBlacklistStrategy blacklistStrategy;
+
+    protected int nimbusMonitorFreqSecs;
+
+    protected Map<String, Set<Integer>> cachedSupervisors;
+
+    //key is supervisor key ,value is supervisor ports
+    protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
+    protected int windowSize;
+    protected Set<String> blacklistHost;
+
+    public BlacklistScheduler(IScheduler underlyingScheduler) {
+        this.underlyingScheduler = underlyingScheduler;
+    }
+
+    @Override
+    public void prepare(Map conf) {
+        LOG.info("prepare black list scheduler");
+        underlyingScheduler.prepare(conf);
+        _conf = conf;
+        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+            toleranceTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME));
+        }
+        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
+            toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
+        }
+        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+            resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
+        }
+        String reporterClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
+        try {
+            reporter = (IReporter) Class.forName(reporterClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        }
+
+        String strategyClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) : "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy";
+        try {
+            blacklistStrategy = (IBlacklistStrategy) Class.forName(strategyClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist strategy for name {}", strategyClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist strategy for name {}", strategyClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist strategy for name {}", strategyClassName);
+            throw new RuntimeException(e);
+        }
+
+        nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklistStrategy.prepare(_conf);
+
+        windowSize=toleranceTime / nimbusMonitorFreqSecs;
+        badSupervisorsToleranceSlidingWindow =EvictingQueue.create(windowSize);
+        cachedSupervisors = new HashMap<>();
+        blacklistHost = new HashSet<>();
+
+        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new Callable<Integer>() {
+            @Override
+            public Integer call() throws Exception {
+                //nimbus:num-blacklisted-supervisor + none blacklisted supervisor = nimbus:num-supervisors
+                return blacklistHost.size();
+            }
+        });
+    }
+
+    @Override
+    public void schedule(Topologies topologies, Cluster cluster) {
+        LOG.debug("running Black List scheduler");
+        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
+        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
+        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
+        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
+
+        blacklistStrategy.resumeFromBlacklist();
+        badSupervisors(supervisors);
+        cluster.setBlacklistedHosts(getBlacklistHosts(cluster, topologies));
+        removeLongTimeDisappearFromCache();
+
+        underlyingScheduler.schedule(topologies, cluster);
+    }
+
+    @Override
+    public Map<String, Object> config() {
+        return underlyingScheduler.config();
+    }
+
+    private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
+        Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
+        Set<String> supervisorsKeySet = supervisors.keySet();
+
+        Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet);//cached supervisor doesn't show up
+        HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
+        for (String key : badSupervisorKeys) {
+            badSupervisors.put(key, cachedSupervisors.get(key));
+        }
+
+        for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
+            String key = entry.getKey();
+            SupervisorDetails supervisorDetails = entry.getValue();
+            if (cachedSupervisors.containsKey(key)) {
+                Set<Integer> badSlots = badSlots(supervisorDetails, key);
+                if (badSlots.size() > 0) {//supervisor contains bad slots
+                    badSupervisors.put(key, badSlots);
+                }
+            } else {
+                cachedSupervisors.put(key, supervisorDetails.getAllPorts());//new supervisor to cache
+            }
+        }
+
+        badSupervisorsToleranceSlidingWindow.add(badSupervisors);
+    }
+
+    private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) {
+        Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey);
+        Set<Integer> supervisorPorts = supervisor.getAllPorts();
+
+        Set<Integer> newPorts = Sets.difference(supervisorPorts, cachedSupervisorPorts);
+        if (newPorts.size() > 0) {
+            //add new ports to cached supervisor
+            cachedSupervisors.put(supervisorKey, Sets.union(newPorts, cachedSupervisorPorts));
+        }
+
+        Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts);
+        return badSlots;
+    }
+
+    public Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
+        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
+        Set<String> blacklistHostSet = new HashSet<>();
+        for (String supervisor : blacklistSet) {
+            String host = cluster.getHost(supervisor);
+            if (host != null) {
+                blacklistHostSet.add(host);
+            } else {
+                LOG.info("supervisor {} is not alive know, do not need to add to blacklist.", supervisor);
+            }
+        }
+        this.blacklistHost = blacklistHostSet;
+        return blacklistHostSet;
+    }
+
+    /**
+     * supervisor or port never exits once in tolerance time will be removed from cache.
+     */
+    private void removeLongTimeDisappearFromCache() {
+
+        Map<String, Integer> supervisorCountMap = new HashMap<String, Integer>();
+        Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, Integer>();
+
+        for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) {
+            Set<String> supervisors = item.keySet();
+            for (String supervisor : supervisors) {
+                int supervisorCount = 0;
+                if (supervisorCountMap.containsKey(supervisor)) {
+                    supervisorCount = supervisorCountMap.get(supervisor);
+                }
+                Set<Integer> slots = item.get(supervisor);
+                if(slots.equals(cachedSupervisors.get(supervisor))){//only all slots are bad means supervisor is bad
+                    supervisorCountMap.put(supervisor, supervisorCount + 1);
+                }
+                for (Integer slot : slots) {
+                    int slotCount = 0;
+                    WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
+                    if (slotCountMap.containsKey(workerSlot)) {
+                        slotCount = slotCountMap.get(workerSlot);
+                    }
+                    slotCountMap.put(workerSlot, slotCount + 1);
+                }
+            }
+        }
+
+        for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) {
+            String key = entry.getKey();
+            int value = entry.getValue();
+            if (value == windowSize) {//supervisor never exits once in tolerance time will be removed from cache
+                cachedSupervisors.remove(key);
+                LOG.info("supervisor {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", key);
+            }
+        }
+
+        for (Map.Entry<WorkerSlot, Integer> entry : slotCountMap.entrySet()) {
+            WorkerSlot workerSlot = entry.getKey();
+            String supervisorKey = workerSlot.getNodeId();
+            Integer slot = workerSlot.getPort();
+            int value = entry.getValue();
+            if (value == windowSize) {//port never exits once in tolerance time will be removed from cache
+                Set<Integer> slots = cachedSupervisors.get(supervisorKey);
+                if (slots != null) {//slots will be null while supervisor has been removed from cached supervisors
+                    slots.remove(slot);
+                    cachedSupervisors.put(supervisorKey, slots);
+                }
+                LOG.info("slot {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", workerSlot);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
new file mode 100644
index 0000000..93c38cb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class Sets {
+
+    public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.addAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.retainAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> difference(Set<T> setA, Set<T> setB) {
+        Set<T> result = new HashSet<T>(setA);
+        result.removeAll(setB);
+        return result;
+    }
+
+    public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) {
+        Set<T> union = union(setA, setB);
+        Set<T> intersection = intersection(setA, setB);
+        return difference(union, intersection);
+    }
+
+    public static <T> boolean isSubset(Set<T> setA, Set<T> setB) {
+        return setB.containsAll(setA);
+    }
+
+
+    public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) {
+        return setA.containsAll(setB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
new file mode 100644
index 0000000..00dcaa4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.reporters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * report blacklist to alert system
+ */
+public interface IReporter {
+    void report(String message);
+
+    void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
new file mode 100644
index 0000000..d8b7679
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.reporters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+public class LogReporter implements IReporter {
+    private static Logger LOG = LoggerFactory.getLogger(LogReporter.class);
+
+    @Override
+    public void report(String message) {
+        LOG.warn(message);
+    }
+
+    @Override
+    public void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer) {
+        String message = "add supervisor " + supervisor + " to blacklist. The bad slot history of supervisors is :" + toleranceBuffer;
+        report(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
new file mode 100644
index 0000000..9b2e9b0
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class DefaultBlacklistStrategy implements IBlacklistStrategy {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
+
+    private IReporter _reporter;
+
+    private int _toleranceCount;
+    private int _resumeTime;
+    private int _nimbusMonitorFreqSecs;
+
+    private TreeMap<String, Integer> blacklist;
+
+    @Override
+    public void prepare(Map conf){
+        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
+            _toleranceCount = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
+        }
+        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+            _resumeTime = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
+        }
+        String reporterClassName = conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
+        try {
+            _reporter = (IReporter) Class.forName(reporterClassName).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName);
+            throw new RuntimeException(e);
+        }
+
+        _nimbusMonitorFreqSecs = ObjectReader.getInt( conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklist = new TreeMap<>();
+    }
+
+    @Override
+    public Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+        Map<String, Integer> countMap = new HashMap<String, Integer>();
+
+        for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
+            Set<String> supervisors = item.keySet();
+            for (String supervisor : supervisors) {
+                int supervisorCount = 0;
+                if (countMap.containsKey(supervisor)) {
+                    supervisorCount = countMap.get(supervisor);
+                }
+                countMap.put(supervisor, supervisorCount + 1);
+            }
+        }
+        for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
+            String supervisor = entry.getKey();
+            int count = entry.getValue();
+            if (count >= _toleranceCount) {
+                if (!blacklist.containsKey(supervisor)) {// if not in blacklist then add it and set the resume time according to config
+                    LOG.info("add supervisor {} to blacklist", supervisor);
+                    LOG.info("supervisorsWithFailures : {}", supervisorsWithFailures);
+                    _reporter.reportBlacklist(supervisor, supervisorsWithFailures);
+                    blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs);
+                }
+            }
+        }
+        releaseBlacklistWhenNeeded(cluster, topologies);
+        return blacklist.keySet();
+    }
+
+    public void resumeFromBlacklist() {
+        Set<String> readyToRemove = new HashSet<String>();
+        for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
+            String key = entry.getKey();
+            int value = entry.getValue() - 1;
+            if (value == 0) {
+                readyToRemove.add(key);
+            } else {
+                blacklist.put(key, value);
+            }
+        }
+        for (String key : readyToRemove) {
+            blacklist.remove(key);
+            LOG.info("supervisor {} reach the resume time ,removed from blacklist", key);
+        }
+    }
+
+    public void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) {
+        if (blacklist.size() > 0) {
+            int totalNeedNumWorkers = 0;
+            List<TopologyDetails> needSchedulingTopologies = cluster.needsSchedulingTopologies();
+            for (TopologyDetails topologyDetails : needSchedulingTopologies) {
+                int numWorkers = topologyDetails.getNumWorkers();
+                int assignedNumWorkers = cluster.getAssignedNumWorkers(topologyDetails);
+                int unAssignedNumWorkers = numWorkers - assignedNumWorkers;
+                totalNeedNumWorkers += unAssignedNumWorkers;
+            }
+            Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
+            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
+            int availableSlotsNotInBlacklistCount = 0;
+            for (WorkerSlot slot : availableSlots) {
+                if (!blacklist.containsKey(slot.getNodeId())) {
+                    availableSlotsNotInBlacklistCount += 1;
+                }
+            }
+            int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
+
+            if (shortage > 0) {
+                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{},num blacklist :{}, will release some blacklist."
+                        , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+                //release earliest blacklist
+                Set<String> readyToRemove = new HashSet<>();
+                for (String supervisor : blacklist.keySet()) {//blacklist is treeMap sorted by value, value minimum meas earliest
+                    if (availableSupervisors.containsKey(supervisor)) {
+                        Set<Integer> ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor));
+                        readyToRemove.add(supervisor);
+                        shortage -= ports.size();
+                        if (shortage <= 0) {//released enough supervisor
+                            break;
+                        }
+                    }
+                }
+                for (String key : readyToRemove) {
+                    blacklist.remove(key);
+                    LOG.info("release supervisor {} for shortage of worker slots.", key);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
new file mode 100644
index 0000000..df79b8c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface IBlacklistStrategy {
+
+    void prepare(Map conf);
+
+    /**
+     * Get blacklist by blacklist strategy
+     * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
+     * @param cluster the cluster these topologies are running in. `cluster` contains everything user
+     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current
+     *       assignments for all the topologies etc. User can set the new assignment for topologies using
+     *       cluster.setAssignmentById()`
+     * @param topologies all the topologies in the cluster, some of them need schedule. Topologies object here
+     *       only contain static information about topologies. Information like assignments, slots are all in
+     *       the `cluster` object.
+     * @return blacklisted supervisors' id set
+     */
+    Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+
+    /**
+     * resume supervisors form blacklist. Blacklist is just a temporary list for supervisors,
+     * or there will be less and less available resources.
+     * This will be called every time before getBlacklist() and schedule.
+     */
+    void resumeFromBlacklist();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
new file mode 100644
index 0000000..5b8bbe7
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FaultGenerateUtils {
+
+    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, int[][][] faults) {
+        List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faults.length);
+        for (int[][] fault : faults) {
+            Map<String, SupervisorDetails> supervisors = TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
+            if (fault.length == 1 && fault[0][0] == -1) {
+                TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-0");
+            }
+        }
+        return supervisorsList;
+    }
+
+    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, List<Map<Integer, List<Integer>>> faultList) {
+        List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faultList.size());
+        for (Map<Integer, List<Integer>> faults : faultList) {
+            Map<String, SupervisorDetails> supervisors = TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
+            for (Map.Entry<Integer, List<Integer>> fault : faults.entrySet()) {
+                int supervisor = fault.getKey();
+                List<Integer> slots = fault.getValue();
+                if (slots.isEmpty()) {
+                    supervisors = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-" + supervisor);
+                } else {
+                    for (int slot : slots) {
+                        supervisors = TestUtilsForBlacklistScheduler.removePortFromSupervisors(supervisors, "sup-" + supervisor, slot);
+                    }
+                }
+            }
+            supervisorsList.add(supervisors);
+        }
+        return supervisorsList;
+    }
+
+    public static Cluster nextCluster(Cluster cluster, Map<String, SupervisorDetails> supervisors, INimbus iNimbus, Map config,
+                                      Topologies topologies) {
+        Map<String, SchedulerAssignmentImpl> assignment;
+        if (cluster == null) {
+            assignment = new HashMap<>();
+        } else {
+            assignment = TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments());
+        }
+        return new Cluster(iNimbus, supervisors, assignment, topologies, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
new file mode 100644
index 0000000..049ef68
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.DefaultScheduler;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestBlacklistScheduler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestBlacklistScheduler.class);
+
+    private static int currentTime = 1468216504;
+
+    @Test
+    public void TestBadSupervisor() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestBadSlot() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestResumeBlacklist() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        for (int i = 0; i < 300 / 10 - 2; i++) {
+            bs.schedule(topologies, cluster);
+        }
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        bs.schedule(topologies, cluster);
+        hosts.clear();
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestReleaseBlacklist() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
+        TopologyDetails topo2 = TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, true);
+        TopologyDetails topo3 = TestUtilsForBlacklistScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, true);
+        TopologyDetails topo4 = TestUtilsForBlacklistScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 32, true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        Set<String> hosts = new HashSet<>();
+        hosts.add("host-0");
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topologies = new Topologies(topoMap);
+        cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        bs.schedule(topologies, cluster);
+        hosts.clear();
+        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+    }
+
+    @Test
+    public void TestList() {
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
+        TopologyDetails topo2 = TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 2, true);
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        Topologies topologies = new Topologies(topoMap);
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+
+        List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
+
+        faultList.add(new HashMap<Integer, List<Integer>>());
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add((Map) ImmutableMap.of(0, new ArrayList<>()));
+        for (int i = 0; i < 17; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        for (int i = 0; i < 8; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(1)));
+        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        for (int i = 0; i < 30; i++) {
+            faultList.add(new HashMap<Integer, List<Integer>>());
+        }
+
+        List<Map<String, SupervisorDetails>> supervisorsList = FaultGenerateUtils.getSupervisorsList(3, 4, faultList);
+        Cluster cluster = null;
+        int count = 0;
+        for (Map<String, SupervisorDetails> supervisors : supervisorsList) {
+            cluster = FaultGenerateUtils.nextCluster(cluster, supervisors, iNimbus, config, topologies);
+            bs.schedule(topologies, cluster);
+            if (count == 0) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 2) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 3) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 30) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 31) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 32) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 60) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                hosts.add("host-1");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 61) {
+                Set<String> hosts = new HashSet<>();
+                hosts.add("host-0");
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            } else if (count == 62) {
+                Set<String> hosts = new HashSet<>();
+                Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+            }
+            count++;
+        }
+
+    }
+
+    @Test
+    public void removeLongTimeDisappearFromCache(){
+        INimbus iNimbus=new TestUtilsForBlacklistScheduler.INimbusTest();
+
+        Map<String, SupervisorDetails> supMap=TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME,200);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT,2);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME,300);
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+
+        TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2,true);
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        BlacklistScheduler bs=new BlacklistScheduler(new DefaultScheduler());
+        bs.prepare(config);
+        bs.schedule(topologies,cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        for(int i=0;i<20;i++){
+            bs.schedule(topologies,cluster);
+        }
+        Set<String> cached=new HashSet<>();
+        cached.add("sup-1");
+        cached.add("sup-2");
+        Assert.assertEquals(cached,bs.cachedSupervisors.keySet());
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        bs.schedule(topologies,cluster);
+        cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        for(int i=0;i<20;i++){
+            bs.schedule(topologies,cluster);
+        }
+        Set<Integer> cachedPorts=new HashSet<>();
+        cachedPorts.add(1);
+        cachedPorts.add(2);
+        cachedPorts.add(3);
+        Assert.assertEquals(cachedPorts,bs.cachedSupervisors.get("sup-0"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/25c27c6d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
new file mode 100644
index 0000000..71055b0
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+
+public class TestUtilsForBlacklistScheduler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForBlacklistScheduler.class);
+
+    public static Map<String, SupervisorDetails> removeSupervisorFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
+        retList.putAll(supervisorDetailsMap);
+        retList.remove(supervisor);
+        return retList;
+    }
+
+    public static Map<String, SupervisorDetails> removePortFromSupervisors(Map<String, SupervisorDetails> supervisorDetailsMap, String supervisor, int port) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
+        for (Map.Entry<String, SupervisorDetails> supervisorDetailsEntry : supervisorDetailsMap.entrySet()) {
+            String supervisorKey = supervisorDetailsEntry.getKey();
+            SupervisorDetails supervisorDetails = supervisorDetailsEntry.getValue();
+            Set<Integer> ports = new HashSet<>();
+            ports.addAll(supervisorDetails.getAllPorts());
+            if (supervisorKey.equals(supervisor)) {
+                ports.remove(port);
+            }
+            SupervisorDetails sup = new SupervisorDetails(supervisorDetails.getId(), supervisorDetails.getHost(), null, (HashSet) ports, null);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
+    public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
+        for (int i = 0; i < numSup; i++) {
+            List<Number> ports = new LinkedList<Number>();
+            for (int j = 0; j < numPorts; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, null);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
+
+    public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
+                                              int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
+
+        Config conf = new Config();
+        conf.putAll(config);
+        conf.put(Config.TOPOLOGY_NAME, name);
+        StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
+        TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
+                3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
+        return topo;
+    }
+
+    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
+        Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
+        int startTask = 0;
+        int endTask = 1;
+        for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+            for (int i = 0; i < spoutParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            for (int i = 0; i < boltParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+        return retMap;
+    }
+
+    public static StormTopology buildTopology(int numSpout, int numBolt,
+                                              int spoutParallelism, int boltParallelism) {
+        LOG.debug("buildTopology with -> numSpout: " + numSpout + " spoutParallelism: "
+                + spoutParallelism + " numBolt: "
+                + numBolt + " boltParallelism: " + boltParallelism);
+        TopologyBuilder builder = new TopologyBuilder();
+
+        for (int i = 0; i < numSpout; i++) {
+            SpoutDeclarer s1 = builder.setSpout("spout-" + i, new TestSpout(),
+                    spoutParallelism);
+        }
+        int j = 0;
+        for (int i = 0; i < numBolt; i++) {
+            if (j >= numSpout) {
+                j = 0;
+            }
+            BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
+                    boltParallelism).shuffleGrouping("spout-" + j);
+        }
+
+        return builder.createTopology();
+    }
+
+    public static class TestSpout extends BaseRichSpout {
+        boolean _isDistributed;
+        SpoutOutputCollector _collector;
+
+        public TestSpout() {
+            this(true);
+        }
+
+        public TestSpout(boolean isDistributed) {
+            _isDistributed = isDistributed;
+        }
+
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+        }
+
+        public void close() {
+        }
+
+        public void nextTuple() {
+            Utils.sleep(100);
+            final String[] words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
+            final Random rand = new Random();
+            final String word = words[rand.nextInt(words.length)];
+            _collector.emit(new Values(word));
+        }
+
+        public void ack(Object msgId) {
+        }
+
+        public void fail(Object msgId) {
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            if (!_isDistributed) {
+                Map<String, Object> ret = new HashMap<String, Object>();
+                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
+                return ret;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public static class TestBolt extends BaseRichBolt {
+        OutputCollector _collector;
+
+        @Override
+        public void prepare(Map conf, TopologyContext context,
+                            OutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+    public static class INimbusTest implements INimbus {
+        @Override
+        public void prepare(Map stormConf, String schedulerLocalDir) {
+
+        }
+
+        @Override
+        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
+            return null;
+        }
+
+        @Override
+        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+        }
+
+        @Override
+        public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+            if (existingSupervisors.containsKey(nodeId)) {
+                return existingSupervisors.get(nodeId).getHost();
+            }
+            return null;
+        }
+
+        @Override
+        public IScheduler getForcedScheduler() {
+            return null;
+        }
+    }
+
+    public static Map<String, SchedulerAssignmentImpl> assignmentMapToImpl(Map<String, SchedulerAssignment> assignmentMap) {
+        Map<String, SchedulerAssignmentImpl> impl = new HashMap<>();
+        for (Map.Entry<String, SchedulerAssignment> entry : assignmentMap.entrySet()) {
+            impl.put(entry.getKey(), (SchedulerAssignmentImpl) entry.getValue());
+        }
+        return impl;
+    }
+}


[3/4] storm git commit: STORM-2083 Blacklist scheduler

Posted by bo...@apache.org.
STORM-2083 Blacklist scheduler

* addressed review comments from @revans2
* also fixed failing test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/74cc7e2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/74cc7e2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/74cc7e2d

Branch: refs/heads/master
Commit: 74cc7e2d8283dffbb2cd6f97e4b3d410f87c40bb
Parents: a7b86c3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 27 10:18:15 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 27 14:09:31 2017 +0900

----------------------------------------------------------------------
 .../test/clj/org/apache/storm/nimbus_test.clj   |  4 +-
 storm-server/pom.xml                            |  2 +-
 .../java/org/apache/storm/DaemonConfig.java     | 50 +++++++-----
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 85 +++++++++++---------
 .../scheduler/blacklist/BlacklistScheduler.java | 70 +++++++++-------
 .../apache/storm/scheduler/blacklist/Sets.java  | 50 +++++++++++-
 .../blacklist/reporters/IReporter.java          |  3 +-
 .../blacklist/reporters/LogReporter.java        |  7 +-
 .../strategies/DefaultBlacklistStrategy.java    | 45 ++++++-----
 .../strategies/IBlacklistStrategy.java          | 11 +--
 10 files changed, 208 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 38b5da0..1f45f9b 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1619,13 +1619,12 @@
                     {STORM-ZOOKEEPER-AUTH-SCHEME scheme
                      STORM-ZOOKEEPER-AUTH-PAYLOAD digest
                      STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal"
+                     NIMBUS-MONITOR-FREQ-SECS 10
                      NIMBUS-THRIFT-PORT 6666})
           expected-acls Nimbus/ZK_ACLS
           fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil))
           fake-cu (proxy [ServerConfigUtils] []
                     (nimbusTopoHistoryStateImpl [conf] nil))
-          fake-ru (proxy [ReflectionUtils] []
-                    (newInstanceImpl [_]))
           fake-utils (proxy [Utils] []
                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
                                                 (upTime [] 0))))
@@ -1633,7 +1632,6 @@
 	  fake-common (proxy [StormCommon] []
                              (mkAuthorizationHandler [_] nil))]
       (with-open [_ (ServerConfigUtilsInstaller. fake-cu)
-                  _ (ReflectionUtilsInstaller. fake-ru)
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index b57151c..7449257 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -136,7 +136,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>4000</maxAllowedViolations>
+                    <maxAllowedViolations>3590</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index c5ee27a..00674c7 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,15 +18,25 @@
 
 package org.apache.storm;
 
-import org.apache.storm.scheduler.blacklist.reporters.IReporter;
-import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
-import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
-import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
-import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isString;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
+
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -36,8 +46,6 @@ import org.apache.storm.validation.Validated;
 import java.util.ArrayList;
 import java.util.Map;
 
-import static org.apache.storm.validation.ConfigValidationAnnotations.*;
-
 /**
  * Storm configs are specified as a plain old map. This class provides constants for
  * all the configurations possible on a Storm cluster. Each constant is paired with an annotation
@@ -110,32 +118,32 @@ public class DaemonConfig implements Validated {
     public static final String STORM_SCHEDULER = "storm.scheduler";
 
     /**
-     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
+     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
 
     /**
-     * The number of hit count that will trigger blacklist in tolerance time
+     * The number of hit count that will trigger blacklist in tolerance time.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count";
 
     /**
-     * The number of seconds that the blacklisted slots or supervisor will be resumed
+     * The number of seconds that the blacklisted slots or supervisor will be resumed.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
 
     /**
-     * The class that the blacklist scheduler will report the blacklist
+     * The class that the blacklist scheduler will report the blacklist.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IReporter.class)
     public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter";
 
     /**
-     * The class that specifies the eviction strategy to use in blacklist scheduler
+     * The class that specifies the eviction strategy to use in blacklist scheduler.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
@@ -144,7 +152,7 @@ public class DaemonConfig implements Validated {
     /**
      * Whether we want to display all the resource capacity and scheduled usage on the UI page.
      * You MUST have this variable set if you are using any kind of resource-related scheduler.
-     *
+     * <p/>
      * If this is not set, we will not display resource capacity and usage on the UI.
      */
     @isBoolean
@@ -155,7 +163,7 @@ public class DaemonConfig implements Validated {
      * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
      * implementation to access optional settings.
      */
-    @isType(type=Map.class)
+    @isType(type = Map.class)
     public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params";
 
     /**
@@ -873,7 +881,9 @@ public class DaemonConfig implements Validated {
      * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure
      * per user resource guarantees.
      */
-    @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
+    @isMapEntryCustom(
+            keyValidatorClasses = {ConfigValidation.StringValidator.class},
+            valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
     public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 7331906..e54aaeb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -134,6 +134,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.nimbus.ITopologyValidator;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Cluster.SupervisorResources;
 import org.apache.storm.scheduler.DefaultScheduler;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
@@ -144,10 +145,9 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
-import org.apache.storm.scheduler.Cluster.SupervisorResources;
-import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
@@ -184,7 +184,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Nimbus implements Iface, Shutdownable, DaemonCommon {
-    private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class);
     
     //    Metrics
     private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
@@ -196,7 +196,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private static final Meter deactivateCalls = StormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls");
     private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
     private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
-    private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
+    private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getComponentPendingProfileActions-calls");
     private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
     private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
     private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
@@ -212,21 +213,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private static final Meter getClusterInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
     private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
     private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
-    private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
+    private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getTopologyInfoWithOpts-calls");
     private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
     private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
     private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
     private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
-    private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls");
-    private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", new ExponentiallyDecayingReservoir());
+    private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
+            new ExponentiallyDecayingReservoir());
+    private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getOwnerResourceSummaries-calls");
     private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
     // END Metrics
     
     private static final String STORM_VERSION = VersionInfo.getVersion();
+
     @VisibleForTesting
     public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
             new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+
     private static final Subject NIMBUS_SUBJECT = new Subject();
+
     static {
         NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
         NIMBUS_SUBJECT.setReadOnly();
@@ -442,13 +449,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600),
-                (id, stream) -> {
-                    try {
-                        stream.close();
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                });
+            (id, stream) -> {
+                try {
+                    stream.close();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
     private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
@@ -488,26 +495,26 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600),
-                (id, stream) -> {
-                    try {
-                        if (stream instanceof AtomicOutputStream) {
-                            ((AtomicOutputStream) stream).cancel();
-                        } else {
-                            stream.close();
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+            (id, stream) -> {
+                try {
+                    if (stream instanceof AtomicOutputStream) {
+                        ((AtomicOutputStream) stream).cancel();
+                    } else {
+                        stream.close();
                     }
-                });
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
     
     /**
      * Constructs a TimeCacheMap instance with a blobstore timeout and no callback function.
-     * @param conf
-     * @return
+     * @param conf the config to use
+     * @return the newly created TimeCacheMap
      */
     @SuppressWarnings("deprecation")
-    private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) {
+    private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600));
     }
     
@@ -528,7 +535,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     @SuppressWarnings("unchecked")
     private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
-        Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
+        Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(
+                DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
         List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>();
         if (consumers != null) {
             for (Map<String, Object> consumer : consumers) {
@@ -561,15 +569,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return kseq.getKeySequenceNumber(conf);
     }
     
-    private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException,
+            IOException {
         return tc.readTopology(topoId, getSubject());
     }
     
-    private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+            AuthorizationException, IOException {
         return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
     }
     
-    private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+            AuthorizationException, IOException {
         return tc.readTopology(topoId, NIMBUS_SUBJECT);
     }
     
@@ -624,8 +635,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
 
-    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments,
-            Map<String, Assignment> existingAssignments) {
+    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(
+            Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
         Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
         // Print some useful information
         if (existingAssignments != null && !existingAssignments.isEmpty()) {
@@ -672,7 +683,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
             slotAssigned.put(key, value);
         }
-        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : Utils.reverseMap(newExecToNodePort);
+        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
+                Utils.reverseMap(newExecToNodePort);
         HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
         for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) {
             List<List<Long>> value = new ArrayList<>(entry.getValue());
@@ -713,7 +725,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return state.getTopoId(topoName).isPresent();
     }
     
-    private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
+    private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException,
+            IOException {
         try {
             return readTopoConfAsNimbus(topoId, tc);
             //Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
@@ -1090,7 +1103,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.topoCache = topoCache;
         this.blobDownloaders = makeBlobCacheMap(conf);
         this.blobUploaders = makeBlobCacheMap(conf);
-        this.blobListers = makeBlobListCachMap(conf);
+        this.blobListers = makeBlobListCacheMap(conf);
         this.uptime = Utils.makeUptimeComputer();
         this.validator = ReflectionUtils.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
         this.timer = new StormTimer(null, (t, e) -> {

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index a05d814..8083e01 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -15,9 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.EvictingQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -30,15 +39,10 @@ import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
 import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
 
 public class BlacklistScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
@@ -49,7 +53,7 @@ public class BlacklistScheduler implements IScheduler {
 
     private final IScheduler underlyingScheduler;
     @SuppressWarnings("rawtypes")
-    private Map _conf;
+    private Map conf;
 
     protected int toleranceTime;
     protected int toleranceCount;
@@ -74,22 +78,25 @@ public class BlacklistScheduler implements IScheduler {
     public void prepare(Map conf) {
         LOG.info("Preparing black list scheduler");
         underlyingScheduler.prepare(conf);
-        _conf = conf;
+        this.conf = conf;
 
-        toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
-        toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
-        resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+        toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+        toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
+                DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
-        String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+        String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
                 LogReporter.class.getName());
         reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
 
-        String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
+        String strategyClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
                 DefaultBlacklistStrategy.class.getName());
         blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
 
-        nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
-        blacklistStrategy.prepare(_conf);
+        nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklistStrategy.prepare(this.conf);
 
         windowSize = toleranceTime / nimbusMonitorFreqSecs;
         badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
@@ -108,11 +115,11 @@ public class BlacklistScheduler implements IScheduler {
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.debug("running Black List scheduler");
-        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
         LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
         LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
 
+        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         blacklistStrategy.resumeFromBlacklist();
         badSupervisors(supervisors);
         Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
@@ -169,7 +176,8 @@ public class BlacklistScheduler implements IScheduler {
     }
 
     private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
-        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
+        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+                cluster, topologies);
         Set<String> blacklistHostSet = new HashSet<>();
         for (String supervisor : blacklistSet) {
             String host = cluster.getHost(supervisor);
@@ -226,23 +234,29 @@ public class BlacklistScheduler implements IScheduler {
                     slots.remove(slot);
                     cachedSupervisors.put(supervisorKey, slots);
                 }
-                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
+                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.",
+                        workerSlot);
             }
         }
     }
 
     private Object initializeInstance(String className, String representation) {
         try {
-            return Class.forName(className).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find {} for name {}", representation, className);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException {} for name {}", representation, className);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
-            throw new RuntimeException(e);
+            return ReflectionUtils.newInstance(className);
+        } catch (RuntimeException e) {
+            Throwable cause = e.getCause();
+
+            if (cause instanceof ClassNotFoundException) {
+                LOG.error("Can't find {} for name {}", representation, className);
+            } else if (cause instanceof InstantiationException) {
+                LOG.error("Throw InstantiationException {} for name {}", representation, className);
+            } else if (cause instanceof IllegalAccessException) {
+                LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
+            } else {
+                LOG.error("Throw unexpected exception {} {} for name {}", cause, representation, className);
+            }
+
+            throw e;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
index 93c38cb..57344e6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist;
 
 import java.util.HashSet;
@@ -22,35 +23,82 @@ import java.util.Set;
 
 public class Sets {
 
+    /**
+     * Calculate union of both sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is union of both Sets.
+     */
     public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.addAll(setB);
         return result;
     }
 
+    /**
+     * Calculate intersection of both sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is intersection of both Sets.
+     */
     public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.retainAll(setB);
         return result;
     }
 
+    /**
+     * Calculate difference of difference of two sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is difference of two sets.
+     */
     public static <T> Set<T> difference(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.removeAll(setB);
         return result;
     }
 
+    /**
+     * Calculate symmetric difference of two sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is symmetric difference of two sets.
+     */
     public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) {
         Set<T> union = union(setA, setB);
         Set<T> intersection = intersection(setA, setB);
         return difference(union, intersection);
     }
 
+    /**
+     * Check whether a set is a subset of another set.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return true when setB is a subset of setA, false otherwise.
+     */
     public static <T> boolean isSubset(Set<T> setA, Set<T> setB) {
         return setB.containsAll(setA);
     }
 
-
+    /**
+     * Check whether a set is a superset of another set.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return true when setA is a superset of setB, false otherwise.
+     */
     public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) {
         return setA.containsAll(setB);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 781c37a..153829c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist.reporters;
 
 import java.util.List;
@@ -22,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * report blacklist to alert system
+ * report blacklist to alert system.
  */
 public interface IReporter {
     void report(String message);

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index 94cfebd..3255c9d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.scheduler.blacklist.reporters;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.scheduler.blacklist.reporters;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class LogReporter implements IReporter {
     private static Logger LOG = LoggerFactory.getLogger(LogReporter.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index cc7f403..00cf25a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist.strategies;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.SupervisorDetails;
@@ -29,13 +37,6 @@ import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
@@ -43,24 +44,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
     public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
 
-    private IReporter _reporter;
+    private IReporter reporter;
 
-    private int _toleranceCount;
-    private int _resumeTime;
-    private int _nimbusMonitorFreqSecs;
+    private int toleranceCount;
+    private int resumeTime;
+    private int nimbusMonitorFreqSecs;
 
     private TreeMap<String, Integer> blacklist;
 
     @Override
-    public void prepare(Map conf){
-        _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
-        _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+    public void prepare(Map conf) {
+        toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
         String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
                 LogReporter.class.getName());
-        _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
+        reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
 
-        _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklist = new TreeMap<>();
     }
 
@@ -78,12 +80,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
         for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
             String supervisor = entry.getKey();
             int count = entry.getValue();
-            if (count >= _toleranceCount) {
+            if (count >= toleranceCount) {
                 if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
                     LOG.debug("add supervisor {} to blacklist", supervisor);
                     LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
-                    _reporter.reportBlacklist(supervisor, supervisorsWithFailures);
-                    blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs);
+                    reporter.reportBlacklist(supervisor, supervisorsWithFailures);
+                    blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
                 }
             }
         }
@@ -130,8 +132,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
 
             if (shortage > 0) {
-                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{},num blacklist :{}, will release some blacklist."
-                        , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " +
+                        "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+
                 //release earliest blacklist
                 Set<String> readyToRemove = new HashSet<>();
                 for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index a35a1d2..f050006 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -15,22 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.scheduler.blacklist.strategies;
 
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
+package org.apache.storm.scheduler.blacklist.strategies;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
 public interface IBlacklistStrategy {
 
     void prepare(Map conf);
 
     /**
-     * Get blacklist by blacklist strategy
+     * Get blacklist by blacklist strategy.
+     *
      * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
      * @param cluster the cluster these topologies are running in. `cluster` contains everything user
      *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current


[4/4] storm git commit: Merge branch 'STORM-2083' of https://github.com/HeartSaVioR/storm into STORM-2083

Posted by bo...@apache.org.
Merge branch 'STORM-2083' of https://github.com/HeartSaVioR/storm into STORM-2083

STORM-2083: Blacklist scheduler

This closes #2343
This closes #1674


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/177cb95f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/177cb95f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/177cb95f

Branch: refs/heads/master
Commit: 177cb95f4fbcb3011f73b88d8755b577671ed527
Parents: 293643f 74cc7e2
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 29 08:43:52 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 29 08:43:52 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   6 +
 .../src/jvm/org/apache/storm/Config.java        |   6 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  22 +-
 storm-server/pom.xml                            |   2 +-
 .../java/org/apache/storm/DaemonConfig.java     |  59 +++-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  91 +++---
 .../scheduler/blacklist/BlacklistScheduler.java | 262 +++++++++++++++
 .../apache/storm/scheduler/blacklist/Sets.java  | 106 +++++++
 .../blacklist/reporters/IReporter.java          |  32 ++
 .../blacklist/reporters/LogReporter.java        |  41 +++
 .../strategies/DefaultBlacklistStrategy.java    | 172 ++++++++++
 .../strategies/IBlacklistStrategy.java          |  54 ++++
 .../scheduler/blacklist/FaultGenerateUtils.java |  63 ++++
 .../blacklist/TestBlacklistScheduler.java       | 316 +++++++++++++++++++
 .../TestUtilsForBlacklistScheduler.java         | 263 +++++++++++++++
 15 files changed, 1438 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/177cb95f/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/177cb95f/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------


[2/4] storm git commit: STORM-2083 Blacklist scheduler

Posted by bo...@apache.org.
STORM-2083 Blacklist scheduler

* address review comments from @HeartSaVioR and @revans2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7b86c32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7b86c32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7b86c32

Branch: refs/heads/master
Commit: a7b86c328ad56adf7e7259aeb2a9bcc1ece88b4f
Parents: 25c27c6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Sep 25 22:16:48 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 27 14:09:31 2017 +0900

----------------------------------------------------------------------
 .../scheduler/blacklist/BlacklistScheduler.java | 113 +++++++++----------
 .../blacklist/reporters/IReporter.java          |   4 +-
 .../blacklist/reporters/LogReporter.java        |   8 +-
 .../strategies/DefaultBlacklistStrategy.java    |  68 +++++------
 .../strategies/IBlacklistStrategy.java          |   2 +-
 .../scheduler/blacklist/FaultGenerateUtils.java |  11 --
 .../blacklist/TestBlacklistScheduler.java       |  67 +++++------
 7 files changed, 123 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 0080134..a05d814 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -18,7 +18,6 @@
 package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.EvictingQueue;
-import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -27,9 +26,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
+import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +42,11 @@ import java.util.concurrent.Callable;
 
 public class BlacklistScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
+
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
+
     private final IScheduler underlyingScheduler;
     @SuppressWarnings("rawtypes")
     private Map _conf;
@@ -67,51 +72,27 @@ public class BlacklistScheduler implements IScheduler {
 
     @Override
     public void prepare(Map conf) {
-        LOG.info("prepare black list scheduler");
+        LOG.info("Preparing black list scheduler");
         underlyingScheduler.prepare(conf);
         _conf = conf;
-        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
-            toleranceTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME));
-        }
-        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
-            toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
-        }
-        if (_conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
-            resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
-        }
-        String reporterClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
-        try {
-            reporter = (IReporter) Class.forName(reporterClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        }
 
-        String strategyClassName = _conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) ? (String) _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY) : "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy";
-        try {
-            blacklistStrategy = (IBlacklistStrategy) Class.forName(strategyClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist strategy for name {}", strategyClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist strategy for name {}", strategyClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist strategy for name {}", strategyClassName);
-            throw new RuntimeException(e);
-        }
+        toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+        toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+
+        String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+                LogReporter.class.getName());
+        reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
+
+        String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
+                DefaultBlacklistStrategy.class.getName());
+        blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
 
         nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklistStrategy.prepare(_conf);
 
-        windowSize=toleranceTime / nimbusMonitorFreqSecs;
-        badSupervisorsToleranceSlidingWindow =EvictingQueue.create(windowSize);
+        windowSize = toleranceTime / nimbusMonitorFreqSecs;
+        badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
         cachedSupervisors = new HashMap<>();
         blacklistHost = new HashSet<>();
 
@@ -134,7 +115,9 @@ public class BlacklistScheduler implements IScheduler {
 
         blacklistStrategy.resumeFromBlacklist();
         badSupervisors(supervisors);
-        cluster.setBlacklistedHosts(getBlacklistHosts(cluster, topologies));
+        Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
+        this.blacklistHost = blacklistHosts;
+        cluster.setBlacklistedHosts(blacklistHosts);
         removeLongTimeDisappearFromCache();
 
         underlyingScheduler.schedule(topologies, cluster);
@@ -149,7 +132,7 @@ public class BlacklistScheduler implements IScheduler {
         Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
         Set<String> supervisorsKeySet = supervisors.keySet();
 
-        Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet);//cached supervisor doesn't show up
+        Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached supervisor doesn't show up
         HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
         for (String key : badSupervisorKeys) {
             badSupervisors.put(key, cachedSupervisors.get(key));
@@ -160,11 +143,11 @@ public class BlacklistScheduler implements IScheduler {
             SupervisorDetails supervisorDetails = entry.getValue();
             if (cachedSupervisors.containsKey(key)) {
                 Set<Integer> badSlots = badSlots(supervisorDetails, key);
-                if (badSlots.size() > 0) {//supervisor contains bad slots
+                if (badSlots.size() > 0) { //supervisor contains bad slots
                     badSupervisors.put(key, badSlots);
                 }
             } else {
-                cachedSupervisors.put(key, supervisorDetails.getAllPorts());//new supervisor to cache
+                cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache
             }
         }
 
@@ -185,7 +168,7 @@ public class BlacklistScheduler implements IScheduler {
         return badSlots;
     }
 
-    public Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
+    private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
         Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
         Set<String> blacklistHostSet = new HashSet<>();
         for (String supervisor : blacklistSet) {
@@ -193,10 +176,9 @@ public class BlacklistScheduler implements IScheduler {
             if (host != null) {
                 blacklistHostSet.add(host);
             } else {
-                LOG.info("supervisor {} is not alive know, do not need to add to blacklist.", supervisor);
+                LOG.info("supervisor {} is not alive, do not need to add to blacklist.", supervisor);
             }
         }
-        this.blacklistHost = blacklistHostSet;
         return blacklistHostSet;
     }
 
@@ -211,20 +193,14 @@ public class BlacklistScheduler implements IScheduler {
         for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) {
             Set<String> supervisors = item.keySet();
             for (String supervisor : supervisors) {
-                int supervisorCount = 0;
-                if (supervisorCountMap.containsKey(supervisor)) {
-                    supervisorCount = supervisorCountMap.get(supervisor);
-                }
+                int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
                 Set<Integer> slots = item.get(supervisor);
-                if(slots.equals(cachedSupervisors.get(supervisor))){//only all slots are bad means supervisor is bad
+                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad
                     supervisorCountMap.put(supervisor, supervisorCount + 1);
                 }
                 for (Integer slot : slots) {
-                    int slotCount = 0;
                     WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
-                    if (slotCountMap.containsKey(workerSlot)) {
-                        slotCount = slotCountMap.get(workerSlot);
-                    }
+                    int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
                     slotCountMap.put(workerSlot, slotCount + 1);
                 }
             }
@@ -233,9 +209,9 @@ public class BlacklistScheduler implements IScheduler {
         for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) {
             String key = entry.getKey();
             int value = entry.getValue();
-            if (value == windowSize) {//supervisor never exits once in tolerance time will be removed from cache
+            if (value == windowSize) { // supervisor which was never back to normal in tolerance period will be removed from cache
                 cachedSupervisors.remove(key);
-                LOG.info("supervisor {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", key);
+                LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", key);
             }
         }
 
@@ -244,14 +220,29 @@ public class BlacklistScheduler implements IScheduler {
             String supervisorKey = workerSlot.getNodeId();
             Integer slot = workerSlot.getPort();
             int value = entry.getValue();
-            if (value == windowSize) {//port never exits once in tolerance time will be removed from cache
+            if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
                 Set<Integer> slots = cachedSupervisors.get(supervisorKey);
-                if (slots != null) {//slots will be null while supervisor has been removed from cached supervisors
+                if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors
                     slots.remove(slot);
                     cachedSupervisors.put(supervisorKey, slots);
                 }
-                LOG.info("slot {} has never exited once during tolerance time, proberbly be dead forever, removed from cache.", workerSlot);
+                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
             }
         }
     }
+
+    private Object initializeInstance(String className, String representation) {
+        try {
+            return Class.forName(className).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 00dcaa4..781c37a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.scheduler.blacklist.reporters;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -27,5 +27,5 @@ import java.util.Set;
 public interface IReporter {
     void report(String message);
 
-    void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer);
+    void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> toleranceBuffer);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index d8b7679..94cfebd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -20,8 +20,8 @@ package org.apache.storm.scheduler.blacklist.reporters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class LogReporter implements IReporter {
@@ -33,8 +33,8 @@ public class LogReporter implements IReporter {
     }
 
     @Override
-    public void reportBlacklist(String supervisor, List<HashMap<String, Set<Integer>>> toleranceBuffer) {
-        String message = "add supervisor " + supervisor + " to blacklist. The bad slot history of supervisors is :" + toleranceBuffer;
-        report(message);
+    public void reportBlacklist(String supervisor, List<Map<String, Set<Integer>>> toleranceBuffer) {
+        LOG.warn("add supervisor {}  to blacklist. The bad slot history of supervisors is : {}",
+                supervisor, toleranceBuffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 9b2e9b0..cc7f403 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -24,6 +24,7 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
 
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
+    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
+
     private IReporter _reporter;
 
     private int _toleranceCount;
@@ -49,41 +53,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     @Override
     public void prepare(Map conf){
-        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) {
-            _toleranceCount = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT));
-        }
-        if (conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME)) {
-            _resumeTime = ObjectReader.getInt( conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME));
-        }
-        String reporterClassName = conf.containsKey(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) ? (String) conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER) : "org.apache.storm.scheduler.blacklist.reporters.LogReporter" ;
-        try {
-            _reporter = (IReporter) Class.forName(reporterClassName).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName);
-            throw new RuntimeException(e);
-        }
+        _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+
+        String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+                LogReporter.class.getName());
+        _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
 
-        _nimbusMonitorFreqSecs = ObjectReader.getInt( conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklist = new TreeMap<>();
     }
 
     @Override
-    public Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+    public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
         Map<String, Integer> countMap = new HashMap<String, Integer>();
 
         for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
             Set<String> supervisors = item.keySet();
             for (String supervisor : supervisors) {
-                int supervisorCount = 0;
-                if (countMap.containsKey(supervisor)) {
-                    supervisorCount = countMap.get(supervisor);
-                }
+                int supervisorCount = countMap.getOrDefault(supervisor, 0);
                 countMap.put(supervisor, supervisorCount + 1);
             }
         }
@@ -91,9 +79,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             String supervisor = entry.getKey();
             int count = entry.getValue();
             if (count >= _toleranceCount) {
-                if (!blacklist.containsKey(supervisor)) {// if not in blacklist then add it and set the resume time according to config
-                    LOG.info("add supervisor {} to blacklist", supervisor);
-                    LOG.info("supervisorsWithFailures : {}", supervisorsWithFailures);
+                if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
+                    LOG.debug("add supervisor {} to blacklist", supervisor);
+                    LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
                     _reporter.reportBlacklist(supervisor, supervisorsWithFailures);
                     blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs);
                 }
@@ -103,6 +91,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
         return blacklist.keySet();
     }
 
+    @Override
     public void resumeFromBlacklist() {
         Set<String> readyToRemove = new HashSet<String>();
         for (Map.Entry<String, Integer> entry : blacklist.entrySet()) {
@@ -116,11 +105,11 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
         }
         for (String key : readyToRemove) {
             blacklist.remove(key);
-            LOG.info("supervisor {} reach the resume time ,removed from blacklist", key);
+            LOG.info("Supervisor {} has been blacklisted more than resume period. Removed from blacklist.", key);
         }
     }
 
-    public void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) {
+    private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) {
         if (blacklist.size() > 0) {
             int totalNeedNumWorkers = 0;
             List<TopologyDetails> needSchedulingTopologies = cluster.needsSchedulingTopologies();
@@ -145,12 +134,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
                         , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
                 //release earliest blacklist
                 Set<String> readyToRemove = new HashSet<>();
-                for (String supervisor : blacklist.keySet()) {//blacklist is treeMap sorted by value, value minimum meas earliest
+                for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest
                     if (availableSupervisors.containsKey(supervisor)) {
                         Set<Integer> ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor));
                         readyToRemove.add(supervisor);
                         shortage -= ports.size();
-                        if (shortage <= 0) {//released enough supervisor
+                        if (shortage <= 0) { //released enough supervisor
                             break;
                         }
                     }
@@ -162,4 +151,19 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             }
         }
     }
+
+    private Object initializeInstance(String className, String representation) {
+        try {
+            return Class.forName(className).newInstance();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Can't find {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            LOG.error("Throw InstantiationException {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index df79b8c..a35a1d2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -41,7 +41,7 @@ public interface IBlacklistStrategy {
      *       the `cluster` object.
      * @return blacklisted supervisors' id set
      */
-    Set<String> getBlacklist(List<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+    Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
 
     /**
      * resume supervisors form blacklist. Blacklist is just a temporary list for supervisors,

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index 5b8bbe7..ece6357 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -30,17 +30,6 @@ import java.util.Map;
 
 public class FaultGenerateUtils {
 
-    public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, int[][][] faults) {
-        List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faults.length);
-        for (int[][] fault : faults) {
-            Map<String, SupervisorDetails> supervisors = TestUtilsForBlacklistScheduler.genSupervisors(supervisorCount, slotCount);
-            if (fault.length == 1 && fault[0][0] == -1) {
-                TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-0");
-            }
-        }
-        return supervisorsList;
-    }
-
     public static List<Map<String, SupervisorDetails>> getSupervisorsList(int supervisorCount, int slotCount, List<Map<Integer, List<Integer>>> faultList) {
         List<Map<String, SupervisorDetails>> supervisorsList = new ArrayList<>(faultList.size());
         for (Map<Integer, List<Integer>> faults : faultList) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a7b86c32/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index 049ef68..6cf8a0e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -19,6 +19,7 @@ package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -75,9 +77,7 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -109,9 +109,7 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -142,16 +140,13 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
         for (int i = 0; i < 300 / 10 - 2; i++) {
             bs.schedule(topologies, cluster);
         }
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
         bs.schedule(topologies, cluster);
-        hosts.clear();
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -185,17 +180,14 @@ public class TestBlacklistScheduler {
         bs.schedule(topologies, cluster);
         cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         bs.schedule(topologies, cluster);
-        Set<String> hosts = new HashSet<>();
-        hosts.add("host-0");
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
         topoMap.put(topo2.getId(), topo2);
         topoMap.put(topo3.getId(), topo3);
         topoMap.put(topo4.getId(), topo4);
         topologies = new Topologies(topoMap);
         cluster = new Cluster(iNimbus, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
         bs.schedule(topologies, cluster);
-        hosts.clear();
-        Assert.assertEquals("blacklist", hosts, cluster.getBlacklistedHosts());
+        Assert.assertEquals("blacklist", Collections.emptySet(), cluster.getBlacklistedHosts());
     }
 
     @Test
@@ -218,21 +210,21 @@ public class TestBlacklistScheduler {
 
         List<Map<Integer, List<Integer>>> faultList = new ArrayList<>();
 
-        faultList.add(new HashMap<Integer, List<Integer>>());
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
-        faultList.add((Map) ImmutableMap.of(0, new ArrayList<>()));
+        faultList.add(new HashMap<>());
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add(ImmutableMap.of(0, new ArrayList<>()));
         for (int i = 0; i < 17; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(0, 1)));
-        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(0, 1)));
+        faultList.add(ImmutableMap.of(1, ImmutableList.of(1)));
         for (int i = 0; i < 8; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
-        faultList.add((Map) ImmutableMap.of(0, ImmutableList.of(1)));
-        faultList.add((Map) ImmutableMap.of(1, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(0, ImmutableList.of(1)));
+        faultList.add(ImmutableMap.of(1, ImmutableList.of(1)));
         for (int i = 0; i < 30; i++) {
-            faultList.add(new HashMap<Integer, List<Integer>>());
+            faultList.add(new HashMap<>());
         }
 
         List<Map<String, SupervisorDetails>> supervisorsList = FaultGenerateUtils.getSupervisorsList(3, 4, faultList);
@@ -285,9 +277,9 @@ public class TestBlacklistScheduler {
 
     @Test
     public void removeLongTimeDisappearFromCache(){
-        INimbus iNimbus=new TestUtilsForBlacklistScheduler.INimbusTest();
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
 
-        Map<String, SupervisorDetails> supMap=TestUtilsForBlacklistScheduler.genSupervisors(3,4);
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3,4);
 
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
@@ -301,27 +293,24 @@ public class TestBlacklistScheduler {
 
         Topologies topologies = new Topologies(topoMap);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
-        BlacklistScheduler bs=new BlacklistScheduler(new DefaultScheduler());
+        BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler());
         bs.prepare(config);
         bs.schedule(topologies,cluster);
         cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
-        for(int i=0;i<20;i++){
+        for (int i = 0 ; i < 20 ; i++){
             bs.schedule(topologies,cluster);
         }
-        Set<String> cached=new HashSet<>();
+        Set<String> cached = new HashSet<>();
         cached.add("sup-1");
         cached.add("sup-2");
         Assert.assertEquals(cached,bs.cachedSupervisors.keySet());
         cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
         bs.schedule(topologies,cluster);
         cluster = new Cluster(iNimbus, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap,"sup-0",0),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
-        for(int i=0;i<20;i++){
-            bs.schedule(topologies,cluster);
+        for (int i = 0 ;i < 20 ; i++){
+            bs.schedule(topologies, cluster);
         }
-        Set<Integer> cachedPorts=new HashSet<>();
-        cachedPorts.add(1);
-        cachedPorts.add(2);
-        cachedPorts.add(3);
-        Assert.assertEquals(cachedPorts,bs.cachedSupervisors.get("sup-0"));
+        Set<Integer> cachedPorts = Sets.newHashSet(1, 2, 3);
+        Assert.assertEquals(cachedPorts, bs.cachedSupervisors.get("sup-0"));
     }
 }