You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/29 14:08:47 UTC
[3/4] storm git commit: STORM-2083 Blacklist scheduler
STORM-2083 Blacklist scheduler
* addressed review comments from @revans2
* also fixed failing test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/74cc7e2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/74cc7e2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/74cc7e2d
Branch: refs/heads/master
Commit: 74cc7e2d8283dffbb2cd6f97e4b3d410f87c40bb
Parents: a7b86c3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 27 10:18:15 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 27 14:09:31 2017 +0900
----------------------------------------------------------------------
.../test/clj/org/apache/storm/nimbus_test.clj | 4 +-
storm-server/pom.xml | 2 +-
.../java/org/apache/storm/DaemonConfig.java | 50 +++++++-----
.../org/apache/storm/daemon/nimbus/Nimbus.java | 85 +++++++++++---------
.../scheduler/blacklist/BlacklistScheduler.java | 70 +++++++++-------
.../apache/storm/scheduler/blacklist/Sets.java | 50 +++++++++++-
.../blacklist/reporters/IReporter.java | 3 +-
.../blacklist/reporters/LogReporter.java | 7 +-
.../strategies/DefaultBlacklistStrategy.java | 45 ++++++-----
.../strategies/IBlacklistStrategy.java | 11 +--
10 files changed, 208 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 38b5da0..1f45f9b 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1619,13 +1619,12 @@
{STORM-ZOOKEEPER-AUTH-SCHEME scheme
STORM-ZOOKEEPER-AUTH-PAYLOAD digest
STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal"
+ NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-THRIFT-PORT 6666})
expected-acls Nimbus/ZK_ACLS
fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil))
fake-cu (proxy [ServerConfigUtils] []
(nimbusTopoHistoryStateImpl [conf] nil))
- fake-ru (proxy [ReflectionUtils] []
- (newInstanceImpl [_]))
fake-utils (proxy [Utils] []
(makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
(upTime [] 0))))
@@ -1633,7 +1632,6 @@
fake-common (proxy [StormCommon] []
(mkAuthorizationHandler [_] nil))]
(with-open [_ (ServerConfigUtilsInstaller. fake-cu)
- _ (ReflectionUtilsInstaller. fake-ru)
_ (UtilsInstaller. fake-utils)
- (StormCommonInstaller. fake-common)
zk-le (MockedZookeeper. (proxy [Zookeeper] []
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index b57151c..7449257 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -136,7 +136,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>4000</maxAllowedViolations>
+ <maxAllowedViolations>3590</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index c5ee27a..00674c7 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,15 +18,25 @@
package org.apache.storm;
-import org.apache.storm.scheduler.blacklist.reporters.IReporter;
-import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
-import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
-import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
-import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
-import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isString;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList;
+import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
+
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -36,8 +46,6 @@ import org.apache.storm.validation.Validated;
import java.util.ArrayList;
import java.util.Map;
-import static org.apache.storm.validation.ConfigValidationAnnotations.*;
-
/**
* Storm configs are specified as a plain old map. This class provides constants for
* all the configurations possible on a Storm cluster. Each constant is paired with an annotation
@@ -110,32 +118,32 @@ public class DaemonConfig implements Validated {
public static final String STORM_SCHEDULER = "storm.scheduler";
/**
- * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
+ * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors.
*/
- @isInteger
+ @isPositiveNumber
public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
/**
- * The number of hit count that will trigger blacklist in tolerance time
+ * The number of hit count that will trigger blacklist in tolerance time.
*/
- @isInteger
+ @isPositiveNumber
public static final String BLACKLIST_SCHEDULER_TOLERANCE_COUNT = "blacklist.scheduler.tolerance.count";
/**
- * The number of seconds that the blacklisted slots or supervisor will be resumed
+ * The number of seconds that the blacklisted slots or supervisor will be resumed.
*/
- @isInteger
+ @isPositiveNumber
public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
/**
- * The class that the blacklist scheduler will report the blacklist
+ * The class that the blacklist scheduler will report the blacklist.
*/
@NotNull
@isImplementationOfClass(implementsClass = IReporter.class)
public static final String BLACKLIST_SCHEDULER_REPORTER = "blacklist.scheduler.reporter";
/**
- * The class that specifies the eviction strategy to use in blacklist scheduler
+ * The class that specifies the eviction strategy to use in blacklist scheduler.
*/
@NotNull
@isImplementationOfClass(implementsClass = IBlacklistStrategy.class)
@@ -144,7 +152,7 @@ public class DaemonConfig implements Validated {
/**
* Whether we want to display all the resource capacity and scheduled usage on the UI page.
* You MUST have this variable set if you are using any kind of resource-related scheduler.
- *
+ * <p/>
* If this is not set, we will not display resource capacity and usage on the UI.
*/
@isBoolean
@@ -155,7 +163,7 @@ public class DaemonConfig implements Validated {
* Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
* implementation to access optional settings.
*/
- @isType(type=Map.class)
+ @isType(type = Map.class)
public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params";
/**
@@ -873,7 +881,9 @@ public class DaemonConfig implements Validated {
* A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure
* per user resource guarantees.
*/
- @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
+ @isMapEntryCustom(
+ keyValidatorClasses = {ConfigValidation.StringValidator.class},
+ valueValidatorClasses = {ConfigValidation.UserResourcePoolEntryValidator.class})
public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 7331906..e54aaeb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -134,6 +134,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Cluster.SupervisorResources;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
@@ -144,10 +145,9 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
-import org.apache.storm.scheduler.Cluster.SupervisorResources;
-import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
@@ -184,7 +184,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Nimbus implements Iface, Shutdownable, DaemonCommon {
- private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class);
// Metrics
private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
@@ -196,7 +196,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static final Meter deactivateCalls = StormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls");
private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
- private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
+ private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter(
+ "nimbus:num-getComponentPendingProfileActions-calls");
private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
@@ -212,21 +213,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static final Meter getClusterInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
- private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
+ private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter(
+ "nimbus:num-getTopologyInfoWithOpts-calls");
private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
- private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls");
- private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", new ExponentiallyDecayingReservoir());
+ private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
+ new ExponentiallyDecayingReservoir());
+ private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter(
+ "nimbus:num-getOwnerResourceSummaries-calls");
private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
// END Metrics
private static final String STORM_VERSION = VersionInfo.getVersion();
+
@VisibleForTesting
public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+
private static final Subject NIMBUS_SUBJECT = new Subject();
+
static {
NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
NIMBUS_SUBJECT.setReadOnly();
@@ -442,13 +449,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
@SuppressWarnings("deprecation")
private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) {
return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600),
- (id, stream) -> {
- try {
- stream.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ (id, stream) -> {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
@@ -488,26 +495,26 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
@SuppressWarnings("deprecation")
private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) {
return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600),
- (id, stream) -> {
- try {
- if (stream instanceof AtomicOutputStream) {
- ((AtomicOutputStream) stream).cancel();
- } else {
- stream.close();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ (id, stream) -> {
+ try {
+ if (stream instanceof AtomicOutputStream) {
+ ((AtomicOutputStream) stream).cancel();
+ } else {
+ stream.close();
}
- });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
/**
* Constructs a TimeCacheMap instance with a blobstore timeout and no callback function.
- * @param conf
- * @return
+ * @param conf the config to use
+ * @return the newly created TimeCacheMap
*/
@SuppressWarnings("deprecation")
- private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) {
+ private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> conf) {
return new TimeCacheMap<>(ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600));
}
@@ -528,7 +535,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
@SuppressWarnings("unchecked")
private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
- Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
+ Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(
+ DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>();
if (consumers != null) {
for (Map<String, Object> consumer : consumers) {
@@ -561,15 +569,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return kseq.getKeySequenceNumber(conf);
}
- private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException,
+ IOException {
return tc.readTopology(topoId, getSubject());
}
- private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+ AuthorizationException, IOException {
return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
}
- private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException,
+ AuthorizationException, IOException {
return tc.readTopology(topoId, NIMBUS_SUBJECT);
}
@@ -624,8 +635,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
- private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments,
- Map<String, Assignment> existingAssignments) {
+ private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(
+ Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
// Print some useful information
if (existingAssignments != null && !existingAssignments.isEmpty()) {
@@ -672,7 +683,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
slotAssigned.put(key, value);
}
- HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : Utils.reverseMap(newExecToNodePort);
+ HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
+ Utils.reverseMap(newExecToNodePort);
HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) {
List<List<Long>> value = new ArrayList<>(entry.getValue());
@@ -713,7 +725,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return state.getTopoId(topoName).isPresent();
}
- private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
+ private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException,
+ IOException {
try {
return readTopoConfAsNimbus(topoId, tc);
//Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
@@ -1090,7 +1103,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
this.topoCache = topoCache;
this.blobDownloaders = makeBlobCacheMap(conf);
this.blobUploaders = makeBlobCacheMap(conf);
- this.blobListers = makeBlobListCachMap(conf);
+ this.blobListers = makeBlobListCacheMap(conf);
this.uptime = Utils.makeUptimeComputer();
this.validator = ReflectionUtils.newInstance((String) conf.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
this.timer = new StormTimer(null, (t, e) -> {
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index a05d814..8083e01 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -15,9 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler.blacklist;
import com.google.common.collect.EvictingQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
@@ -30,15 +39,10 @@ import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
public class BlacklistScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
@@ -49,7 +53,7 @@ public class BlacklistScheduler implements IScheduler {
private final IScheduler underlyingScheduler;
@SuppressWarnings("rawtypes")
- private Map _conf;
+ private Map conf;
protected int toleranceTime;
protected int toleranceCount;
@@ -74,22 +78,25 @@ public class BlacklistScheduler implements IScheduler {
public void prepare(Map conf) {
LOG.info("Preparing black list scheduler");
underlyingScheduler.prepare(conf);
- _conf = conf;
+ this.conf = conf;
- toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
- toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
- resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+ toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
+ DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+ toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+ DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+ resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
+ DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
- String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
+ String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
LogReporter.class.getName());
reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
- String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
+ String strategyClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
DefaultBlacklistStrategy.class.getName());
blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
- nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
- blacklistStrategy.prepare(_conf);
+ nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+ blacklistStrategy.prepare(this.conf);
windowSize = toleranceTime / nimbusMonitorFreqSecs;
badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
@@ -108,11 +115,11 @@ public class BlacklistScheduler implements IScheduler {
@Override
public void schedule(Topologies topologies, Cluster cluster) {
LOG.debug("running Black List scheduler");
- Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
+ Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
blacklistStrategy.resumeFromBlacklist();
badSupervisors(supervisors);
Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
@@ -169,7 +176,8 @@ public class BlacklistScheduler implements IScheduler {
}
private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
- Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
+ Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+ cluster, topologies);
Set<String> blacklistHostSet = new HashSet<>();
for (String supervisor : blacklistSet) {
String host = cluster.getHost(supervisor);
@@ -226,23 +234,29 @@ public class BlacklistScheduler implements IScheduler {
slots.remove(slot);
cachedSupervisors.put(supervisorKey, slots);
}
- LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
+ LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.",
+ workerSlot);
}
}
}
private Object initializeInstance(String className, String representation) {
try {
- return Class.forName(className).newInstance();
- } catch (ClassNotFoundException e) {
- LOG.error("Can't find {} for name {}", representation, className);
- throw new RuntimeException(e);
- } catch (InstantiationException e) {
- LOG.error("Throw InstantiationException {} for name {}", representation, className);
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
- throw new RuntimeException(e);
+ return ReflectionUtils.newInstance(className);
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+
+ if (cause instanceof ClassNotFoundException) {
+ LOG.error("Can't find {} for name {}", representation, className);
+ } else if (cause instanceof InstantiationException) {
+ LOG.error("Throw InstantiationException {} for name {}", representation, className);
+ } else if (cause instanceof IllegalAccessException) {
+ LOG.error("Throw IllegalAccessException {} for name {}", representation, className);
+ } else {
+ LOG.error("Throw unexpected exception {} {} for name {}", cause, representation, className);
+ }
+
+ throw e;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
index 93c38cb..57344e6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler.blacklist;
import java.util.HashSet;
@@ -22,35 +23,82 @@ import java.util.Set;
public class Sets {
+ /**
+ * Calculate union of both sets.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return the Set which is union of both Sets.
+ */
public static <T> Set<T> union(Set<T> setA, Set<T> setB) {
Set<T> result = new HashSet<T>(setA);
result.addAll(setB);
return result;
}
+ /**
+ * Calculate intersection of both sets.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return the Set which is intersection of both Sets.
+ */
public static <T> Set<T> intersection(Set<T> setA, Set<T> setB) {
Set<T> result = new HashSet<T>(setA);
result.retainAll(setB);
return result;
}
+ /**
+ * Calculate difference of difference of two sets.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return the Set which is difference of two sets.
+ */
public static <T> Set<T> difference(Set<T> setA, Set<T> setB) {
Set<T> result = new HashSet<T>(setA);
result.removeAll(setB);
return result;
}
+ /**
+ * Calculate symmetric difference of two sets.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return the Set which is symmetric difference of two sets.
+ */
public static <T> Set<T> symDifference(Set<T> setA, Set<T> setB) {
Set<T> union = union(setA, setB);
Set<T> intersection = intersection(setA, setB);
return difference(union, intersection);
}
+ /**
+ * Check whether a set is a subset of another set.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return true when setB is a subset of setA, false otherwise.
+ */
public static <T> boolean isSubset(Set<T> setA, Set<T> setB) {
return setB.containsAll(setA);
}
-
+ /**
+ * Check whether a set is a superset of another set.
+ *
+ * @param setA parameter 1
+ * @param setB parameter 2
+ * @param <T> generic type of Set elements.
+ * @return true when setA is a superset of setB, false otherwise.
+ */
public static <T> boolean isSuperset(Set<T> setA, Set<T> setB) {
return setA.containsAll(setB);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 781c37a..153829c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler.blacklist.reporters;
import java.util.List;
@@ -22,7 +23,7 @@ import java.util.Map;
import java.util.Set;
/**
- * report blacklist to alert system
+ * report blacklist to alert system.
*/
public interface IReporter {
void report(String message);
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index 94cfebd..3255c9d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -15,15 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.scheduler.blacklist.reporters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.scheduler.blacklist.reporters;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class LogReporter implements IReporter {
private static Logger LOG = LoggerFactory.getLogger(LogReporter.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index cc7f403..00cf25a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler.blacklist.strategies;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.SupervisorDetails;
@@ -29,13 +37,6 @@ import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
public class DefaultBlacklistStrategy implements IBlacklistStrategy {
private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
@@ -43,24 +44,25 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
- private IReporter _reporter;
+ private IReporter reporter;
- private int _toleranceCount;
- private int _resumeTime;
- private int _nimbusMonitorFreqSecs;
+ private int toleranceCount;
+ private int resumeTime;
+ private int nimbusMonitorFreqSecs;
private TreeMap<String, Integer> blacklist;
@Override
- public void prepare(Map conf){
- _toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
- _resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+ public void prepare(Map conf) {
+ toleranceCount = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
+ DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+ resumeTime = ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
String reporterClassName = ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
LogReporter.class.getName());
- _reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
+ reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
- _nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
+ nimbusMonitorFreqSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
blacklist = new TreeMap<>();
}
@@ -78,12 +80,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
String supervisor = entry.getKey();
int count = entry.getValue();
- if (count >= _toleranceCount) {
+ if (count >= toleranceCount) {
if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
LOG.debug("add supervisor {} to blacklist", supervisor);
LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
- _reporter.reportBlacklist(supervisor, supervisorsWithFailures);
- blacklist.put(supervisor, _resumeTime / _nimbusMonitorFreqSecs);
+ reporter.reportBlacklist(supervisor, supervisorsWithFailures);
+ blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
}
}
}
@@ -130,8 +132,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount;
if (shortage > 0) {
- LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{},num blacklist :{}, will release some blacklist."
- , totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+ LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " +
+ "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size());
+
//release earliest blacklist
Set<String> readyToRemove = new HashSet<>();
for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest
http://git-wip-us.apache.org/repos/asf/storm/blob/74cc7e2d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index a35a1d2..f050006 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -15,22 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.scheduler.blacklist.strategies;
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
+package org.apache.storm.scheduler.blacklist.strategies;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
public interface IBlacklistStrategy {
void prepare(Map conf);
/**
- * Get blacklist by blacklist strategy
+ * Get blacklist by blacklist strategy.
+ *
* @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
* @param cluster the cluster these topologies are running in. `cluster` contains everything user
* need to develop a new scheduling logic. e.g. supervisors information, available slots, current