You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/29 14:08:47 UTC

[3/4] storm git commit: STORM-2083 Blacklist scheduler

STORM-2083 Blacklist scheduler

* addressed review comments from @revans2
* also fixed failing test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/74cc7e2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/74cc7e2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/74cc7e2d

Branch: refs/heads/master
Commit: 74cc7e2d8283dffbb2cd6f97e4b3d410f87c40bb
Parents: a7b86c3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 27 10:18:15 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 27 14:09:31 2017 +0900

----------------------------------------------------------------------
 .../test/clj/org/apache/storm/nimbus_test.clj   |  4 +-
 storm-server/pom.xml                            |  2 +-
 .../java/org/apache/storm/DaemonConfig.java     | 50 +++++++-----
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 85 +++++++++++---------
 .../scheduler/blacklist/BlacklistScheduler.java | 70 +++++++++-------
 .../apache/storm/scheduler/blacklist/Sets.java  | 50 +++++++++++-
 .../blacklist/reporters/IReporter.java          |  3 +-
 .../blacklist/reporters/LogReporter.java        |  7 +-
 .../strategies/DefaultBlacklistStrategy.java    | 45 ++++++-----
 .../strategies/IBlacklistStrategy.java          | 11 +--
 10 files changed, 208 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 38b5da0..1f45f9b 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1619,13 +1619,12 @@
                     {STORM-ZOOKEEPER-AUTH-SCHEME scheme
                      STORM-ZOOKEEPER-AUTH-PAYLOAD digest
                      STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal"
+                     NIMBUS-MONITOR-FREQ-SECS 10
                      NIMBUS-THRIFT-PORT 6666})
           expected-acls Nimbus/ZK_ACLS
           fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil))
           fake-cu (proxy [ServerConfigUtils] []
                     (nimbusTopoHistoryStateImpl [conf] nil))
-          fake-ru (proxy [ReflectionUtils] []
-                    (newInstanceImpl [_]))
           fake-utils (proxy [Utils] []
                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
                                                 (upTime [] 0))))
@@ -1633,7 +1632,6 @@
 	  fake-common (proxy [StormCommon] []
                              (mkAuthorizationHandler [_] nil))]
       (with-open [_ (ServerConfigUtilsInstaller. fake-cu)
-                  _ (ReflectionUtilsInstaller. fake-ru)
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index b57151c..7449257 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -136,7 +136,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>4000</maxAllowedViolations>
+                    <maxAllowedViolations>3590</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index c5ee27a..00674c7 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,15 +18,25 @@
 
 package org.apache.storm;
 
-import org.apache.storm.scheduler.blacklist.reporters.IReporter;
-import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
-import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
-import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
-import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isString;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
+
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -36,8 +46,6 @@ import org.apache.storm.validation.Validated;
 import java.util.ArrayList;
 import java.util.Map;
 
-import static org.apache.storm.validation.ConfigValidationAnnotations.*;
-
 /**
  * Storm configs are specified as a plain old map. This class provides constants for
  * all the configurations possible on a Storm cluster. Each constant is paired with an annotation
@@ -110,32 +118,32 @@ public class DaemonConfig implements Validated {
     public static final String STORM_SCHEDULER = "storm.scheduler";
 
     /**
-     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
+     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
 
     /**
-     * The number of hit count that will trigger blacklist in tolerance time
+     * The number of hit count that will trigger blacklist in tolerance time.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count";
 
     /**
-     * The number of seconds that the blacklisted slots or supervisor will be resumed
+     * The number of seconds that the blacklisted slots or supervisor will be resumed.
      */
-    @isInteger
+    @isPositiveNumber
     public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
 
     /**
-     * The class that the blacklist scheduler will report the blacklist
+     * The class that the blacklist scheduler will report the blacklist.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IReporter.class)
     public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter";
 
     /**
-     * The class that specifies the eviction strategy to use in blacklist scheduler
+     * The class that specifies the eviction strategy to use in blacklist scheduler.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
@@ -144,7 +152,7 @@ public class DaemonConfig implements Validated {
     /**
      * Whether we want to display all the resource capacity and scheduled usage on the UI page.
      * You MUST have this variable set if you are using any kind of resource-related scheduler.
-     *
+     * <p/>
      * If this is not set, we will not display resource capacity and usage on the UI.
      */
     @isBoolean
@@ -155,7 +163,7 @@ public class DaemonConfig implements Validated {
      * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
      * implementation to access optional settings.
      */
-    @isType(type=Map.class)
+    @isType(type = Map.class)
     public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params";
 
     /**
@@ -873,7 +881,9 @@ public class DaemonConfig implements Validated {
      * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure
      * per user resource guarantees.
      */
-    @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
+    @isMapEntryCustom(
+            keyValidatorClasses = {ConfigValidation.StringValidator.class},
+            valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
     public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 7331906..e54aaeb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -134,6 +134,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.nimbus.ITopologyValidator;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Cluster.SupervisorResources;
 import org.apache.storm.scheduler.DefaultScheduler;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
@@ -144,10 +145,9 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
-import org.apache.storm.scheduler.Cluster.SupervisorResources;
-import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
@@ -184,7 +184,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Nimbus implements Iface, Shutdownable, DaemonCommon {
-    private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class);
     
     //    Metrics
     private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
@@ -196,7 +196,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private static final Meter deactivateCalls = StormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls");
     private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
     private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
-    private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
+    private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getComponentPendingProfileActions-calls");
     private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
     private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
     private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
@@ -212,21 +213,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private static final Meter getClusterInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
     private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
     private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
-    private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
+    private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getTopologyInfoWithOpts-calls");
     private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
     private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
     private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
     private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
-    private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls");
-    private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", new ExponentiallyDecayingReservoir());
+    private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
+            new ExponentiallyDecayingReservoir());
+    private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter(
+            "nimbus:num-getOwnerResourceSummaries-calls");
     private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
     // END Metrics
     
     private static final String STORM_VERSION = VersionInfo.getVersion();
+
     @VisibleForTesting
     public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
             new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+
     private static final Subject NIMBUS_SUBJECT = new Subject();
+
     static {
         NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
         NIMBUS_SUBJECT.setReadOnly();
@@ -442,13 +449,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600),
-                (id, stream) -> {
-                    try {
-                        stream.close();
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                });
+            (id, stream) -> {
+                try {
+                    stream.close();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
     private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
@@ -488,26 +495,26 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     @SuppressWarnings("deprecation")
     private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600),
-                (id, stream) -> {
-                    try {
-                        if (stream instanceof AtomicOutputStream) {
-                            ((AtomicOutputStream) stream).cancel();
-                        } else {
-                            stream.close();
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+            (id, stream) -> {
+                try {
+                    if (stream instanceof AtomicOutputStream) {
+                        ((AtomicOutputStream) stream).cancel();
+                    } else {
+                        stream.close();
                     }
-                });
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
     
     /**
      * Constructs a TimeCacheMap instance with a blobstore timeout and no callback function.
-     * @param conf
-     * @return
+     * @param conf the config to use
+     * @return the newly created TimeCacheMap
      */
     @SuppressWarnings("deprecation")
-    private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) {
+    private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> conf) {
         return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600));
     }
     
@@ -528,7 +535,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     @SuppressWarnings("unchecked")
     private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
-        Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
+        Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(
+                DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
         List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>();
         if (consumers != null) {
             for (Map<String, Object> consumer : consumers) {
@@ -561,15 +569,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return kseq.getKeySequenceNumber(conf);
     }
     
-    private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException,
+            IOException {
         return tc.readTopology(topoId, getSubject());
     }
     
-    private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+            AuthorizationException, IOException {
         return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
     }
     
-    private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+    private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+            AuthorizationException, IOException {
         return tc.readTopology(topoId, NIMBUS_SUBJECT);
     }
     
@@ -624,8 +635,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
 
-    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments,
-            Map<String, Assignment> existingAssignments) {
+    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(
+            Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
         Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
         // Print some useful information
         if (existingAssignments != null && !existingAssignments.isEmpty()) {
@@ -672,7 +683,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
             slotAssigned.put(key, value);
         }
-        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : Utils.reverseMap(newExecToNodePort);
+        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
+                Utils.reverseMap(newExecToNodePort);
         HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
         for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) {
             List<List<Long>> value = new ArrayList<>(entry.getValue());
@@ -713,7 +725,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return state.getTopoId(topoName).isPresent();
     }
     
-    private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
+    private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException,
+            IOException {
         try {
             return readTopoConfAsNimbus(topoId, tc);
             //Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
@@ -1090,7 +1103,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.topoCache = topoCache;
         this.blobDownloaders = makeBlobCacheMap(conf);
         this.blobUploaders = makeBlobCacheMap(conf);
-        this.blobListers = makeBlobListCachMap(conf);
+        this.blobListers = makeBlobListCacheMap(conf);
         this.uptime = Utils.makeUptimeComputer();
         this.validator = ReflectionUtils.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
         this.timer = new StormTimer(null, (t, e) -> {

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index a05d814..8083e01 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -15,9 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.EvictingQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -30,15 +39,10 @@ import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
 import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
 
 public class BlacklistScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
@@ -49,7 +53,7 @@ public class BlacklistScheduler implements IScheduler {
 
     private final IScheduler underlyingScheduler;
     @SuppressWarnings("rawtypes")
-    private Map _conf;
+    private Map conf;
 
     protected int toleranceTime;
     protected int toleranceCount;
@@ -74,22 +78,25 @@ public class BlacklistScheduler implements IScheduler {
     public void prepare(Map conf) {
         LOG.info("Preparing black list scheduler");
         underlyingScheduler.prepare(conf);
-        _conf = conf;
+        this.conf = conf;
 
-        toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
-        toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
-        resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+        toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+        toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
+                DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
-        String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+        String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
                 LogReporter.class.getName());
         reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
 
-        String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
+        String strategyClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
                 DefaultBlacklistStrategy.class.getName());
         blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
 
-        nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
-        blacklistStrategy.prepare(_conf);
+        nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        blacklistStrategy.prepare(this.conf);
 
         windowSize = toleranceTime / nimbusMonitorFreqSecs;
         badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
@@ -108,11 +115,11 @@ public class BlacklistScheduler implements IScheduler {
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.debug("running Black List scheduler");
-        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
         LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
         LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
 
+        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         blacklistStrategy.resumeFromBlacklist();
         badSupervisors(supervisors);
         Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
@@ -169,7 +176,8 @@ public class BlacklistScheduler implements IScheduler {
     }
 
     private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
-        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
+        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+                cluster, topologies);
         Set<String> blacklistHostSet = new HashSet<>();
         for (String supervisor : blacklistSet) {
             String host = cluster.getHost(supervisor);
@@ -226,23 +234,29 @@ public class BlacklistScheduler implements IScheduler {
                     slots.remove(slot);
                     cachedSupervisors.put(supervisorKey, slots);
                 }
-                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
+                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.",
+                        workerSlot);
             }
         }
     }
 
     private Object initializeInstance(String className, String representation) {
         try {
-            return Class.forName(className).newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Can't find {} for name {}", representation, className);
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            LOG.error("Throw InstantiationException {} for name {}", representation, className);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
-            throw new RuntimeException(e);
+            return ReflectionUtils.newInstance(className);
+        } catch (RuntimeException e) {
+            Throwable cause = e.getCause();
+
+            if (cause instanceof ClassNotFoundException) {
+                LOG.error("Can't find {} for name {}", representation, className);
+            } else if (cause instanceof InstantiationException) {
+                LOG.error("Throw InstantiationException {} for name {}", representation, className);
+            } else if (cause instanceof IllegalAccessException) {
+                LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
+            } else {
+                LOG.error("Throw unexpected exception {} {} for name {}", cause, representation, className);
+            }
+
+            throw e;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
index 93c38cb..57344e6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist;
 
 import java.util.HashSet;
@@ -22,35 +23,82 @@ import java.util.Set;
 
 public class Sets {
 
+    /**
+     * Calculate union of both sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is union of both Sets.
+     */
     public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.addAll(setB);
         return result;
     }
 
+    /**
+     * Calculate intersection of both sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is intersection of both Sets.
+     */
     public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.retainAll(setB);
         return result;
     }
 
+    /**
+     * Calculate difference of difference of two sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is difference of two sets.
+     */
     public static <T> Set<T> difference(Set<T> setA, Set<T> setB) {
         Set<T> result = new HashSet<T>(setA);
         result.removeAll(setB);
         return result;
     }
 
+    /**
+     * Calculate symmetric difference of two sets.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return the Set which is symmetric difference of two sets.
+     */
     public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) {
         Set<T> union = union(setA, setB);
         Set<T> intersection = intersection(setA, setB);
         return difference(union, intersection);
     }
 
+    /**
+     * Check whether a set is a subset of another set.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return true when setB is a subset of setA, false otherwise.
+     */
     public static <T> boolean isSubset(Set<T> setA, Set<T> setB) {
         return setB.containsAll(setA);
     }
 
-
+    /**
+     * Check whether a set is a superset of another set.
+     *
+     * @param setA parameter 1
+     * @param setB parameter 2
+     * @param <T> generic type of Set elements.
+     * @return true when setA is a superset of setB, false otherwise.
+     */
     public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) {
         return setA.containsAll(setB);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 781c37a..153829c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist.reporters;
 
 import java.util.List;
@@ -22,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * report blacklist to alert system
+ * report blacklist to alert system.
  */
 public interface IReporter {
     void report(String message);

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index 94cfebd..3255c9d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.scheduler.blacklist.reporters;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.scheduler.blacklist.reporters;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class LogReporter implements IReporter {
     private static Logger LOG = LoggerFactory.getLogger(LogReporter.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index cc7f403..00cf25a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler.blacklist.strategies;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.SupervisorDetails;
@@ -29,13 +37,6 @@ import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
     private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
@@ -43,24 +44,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
     public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
 
-    private IReporter _reporter;
+    private IReporter reporter;
 
-    private int _toleranceCount;
-    private int _resumeTime;
-    private int _nimbusMonitorFreqSecs;
+    private int toleranceCount;
+    private int resumeTime;
+    private int nimbusMonitorFreqSecs;
 
     private TreeMap<String, Integer> blacklist;
 
     @Override
-    public void prepare(Map conf){
-        _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
-        _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+    public void prepare(Map conf) {
+        toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+        resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
         String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
                 LogReporter.class.getName());
-        _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
+        reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
 
-        _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+        nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
         blacklist = new TreeMap<>();
     }
 
@@ -78,12 +80,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
         for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
             String supervisor = entry.getKey();
             int count = entry.getValue();
-            if (count >= _toleranceCount) {
+            if (count >= toleranceCount) {
                 if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
                     LOG.debug("add supervisor {} to blacklist", supervisor);
                     LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
-                    _reporter.reportBlacklist(supervisor, supervisorsWithFailures);
-                    blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs);
+                    reporter.reportBlacklist(supervisor, supervisorsWithFailures);
+                    blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
                 }
             }
         }
@@ -130,8 +132,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
 
             if (shortage > 0) {
-                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{},num blacklist :{}, will release some blacklist."
-                        , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+                LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " +
+                        "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+
                 //release earliest blacklist
                 Set<String> readyToRemove = new HashSet<>();
                 for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest

http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index a35a1d2..f050006 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -15,22 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.scheduler.blacklist.strategies;
 
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
+package org.apache.storm.scheduler.blacklist.strategies;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
 public interface IBlacklistStrategy {
 
     void prepare(Map conf);
 
     /**
-     * Get blacklist by blacklist strategy
+     * Get blacklist by blacklist strategy.
+     *
      * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
      * @param cluster the cluster these topologies are running in. `cluster` contains everything user
      *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current