You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2018/04/10 22:04:14 UTC
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
GitHub user revans2 opened a pull request:
https://github.com/apache/storm/pull/2630
STORM-3024: Allow for scheduling to happen in the background.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/revans2/incubator-storm STORM-3024
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2630.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2630
----
commit 4a9d0e2a5399647bf541ec16956472d2d795b4b1
Author: Robert (Bobby) Evans <ev...@...>
Date: 2018-04-10T18:34:20Z
STORM-3024: Allow for scheduling to happen in the background.
----
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 closed the pull request at:
https://github.com/apache/storm/pull/2630
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180804408
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
@@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
u.markTopoUnsuccess(td);
}
+ private void cancelAllPendingClusterStateChanged() {
+ if (!schedulingInBackground.isEmpty()) {
+ LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
+ for (SchedulingPending sp : schedulingInBackground.values()) {
+ sp.cancel();
+ }
+ schedulingInBackground.clear();
+ }
+ }
+
private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
List<TopologyDetails> orderedTopologies) {
+ LOG.debug("Scheduling {}", td.getId());
+ SchedulingPending sp = schedulingInBackground.get(td.getId());
--- End diff --
lot of code, i would move to method sp = getOrCreateSp() or something.
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180801771
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
@@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
u.markTopoUnsuccess(td);
}
+ private void cancelAllPendingClusterStateChanged() {
+ if (!schedulingInBackground.isEmpty()) {
+ LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
+ for (SchedulingPending sp : schedulingInBackground.values()) {
+ sp.cancel();
+ }
+ schedulingInBackground.clear();
+ }
+ }
+
private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
List<TopologyDetails> orderedTopologies) {
+ LOG.debug("Scheduling {}", td.getId());
+ SchedulingPending sp = schedulingInBackground.get(td.getId());
+ if (sp == null) {
+ IStrategy rasStrategy;
+ String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
+ try {
+ String strategy = strategyConf;
+ if (strategy.startsWith("backtype.storm")) {
+ // Storm supports to launch workers of older version.
+ // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name.
+ strategy = strategy.replace("backtype.storm", "org.apache.storm");
+ LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
+ }
+ rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
+ rasStrategy.prepare(conf);
+ } catch (DisallowedStrategyException e) {
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - " + e.getAttemptedClass()
+ + " is not an allowed strategy. Please make sure your "
+ + Config.TOPOLOGY_SCHEDULER_STRATEGY
+ + " config is one of the allowed strategies: "
+ + e.getAllowedStrategies(), e);
+ return;
+ } catch (RuntimeException e) {
+ markFailedTopology(topologySubmitter, cluster, td,
+ "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ + strategyConf
+ + ". Please check logs for details", e);
+ return;
+ }
+
+ sp = new SchedulingPending(rasStrategy, 0);
+ }
+
//A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
Cluster workingState = new Cluster(cluster);
RAS_Nodes nodes = new RAS_Nodes(workingState);
- IStrategy rasStrategy = null;
- String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
- try {
- String strategy = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
- if (strategy.startsWith("backtype.storm")) {
- // Storm supports to launch workers of older version.
- // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name.
- strategy = strategy.replace("backtype.storm", "org.apache.storm");
- LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
- }
- rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
- rasStrategy.prepare(conf);
- } catch (DisallowedStrategyException e) {
- markFailedTopology(topologySubmitter, cluster, td,
- "Unsuccessful in scheduling - " + e.getAttemptedClass()
- + " is not an allowed strategy. Please make sure your "
- + Config.TOPOLOGY_SCHEDULER_STRATEGY
- + " config is one of the allowed strategies: "
- + e.getAllowedStrategies(), e);
- return;
- } catch (RuntimeException e) {
- markFailedTopology(topologySubmitter, cluster, td,
- "Unsuccessful in scheduling - failed to create instance of topology strategy "
- + strategyConf
- + ". Please check logs for details", e);
- return;
- }
- for (int i = 0; i < maxSchedulingAttempts; i++) {
+ for (int i = sp.getAttempt(); i < maxSchedulingAttempts; i++) {
SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
try {
- SchedulingResult result = rasStrategy.schedule(toSchedule, td);
+ SchedulingResult result;
+ Future<SchedulingResult> schedulingFuture = sp.scheduleIfNeeded(background, toSchedule, td);
+ try {
+ result = schedulingFuture.get(schedulingForegroundTimeoutSeconds, TimeUnit.SECONDS);
+ sp.resetFuture();
+ } catch (TimeoutException te) {
+ long elapsedTimeSecs = (Time.currentTimeMillis() - sp.getStartTime())/1000;
+ if (elapsedTimeSecs >= schedulingBackgroundTimeoutSeconds) {
--- End diff --
what would be wrong with just letting this remain running?
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180803721
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
@@ -60,15 +122,33 @@ public void prepare(Map<String, Object> conf) {
configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
maxSchedulingAttempts = ObjectReader.getInt(
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5);
+ schedulingBackgroundTimeoutSeconds = ObjectReader.getInt(
+ conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 600);
+ schedulingForegroundTimeoutSeconds = ObjectReader.getInt(
+ conf.get(DaemonConfig.SCHEDULING_FOREGROUND_TIMEOUT_SECONDS_PER_TOPOLOGY), 10);
+ background =
+ Executors.newCachedThreadPool(r -> new Thread(r, "RAS_WORKER_" + THREAD_COUNTER.getAndIncrement()));
}
@Override
public Map<String, Map<String, Double>> config() {
- return (Map) getUserResourcePools();
+ return getUserResourcePools();
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
+ //Cancel any pending scheduling attempts for a topology that is no longer running.
--- End diff --
call cancelAllPendingClusterStateChanged()? Rename it to cancelAllPending() or something?
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180805730
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---
@@ -65,6 +65,9 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
final List<ObjectResources> sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes);
for (ExecutorDetails exec : orderedExecutors) {
+ if (!running) {
--- End diff --
this is just to force a quicker cancellation of scheduling?
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180615319
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
@@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
u.markTopoUnsuccess(td);
}
+ private void cancelAllPendingClusterStateChanged() {
+ if (!schedulingInBackground.isEmpty()) {
+ LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
--- End diff --
Is it "Canceling scheduling of {} _**because of**_ cluster state changed"?
---
[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2630#discussion_r180805036
--- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
@@ -49,20 +49,30 @@
protected Cluster cluster;
private Map<String, List<String>> networkTopography;
protected RAS_Nodes nodes;
+ /**
+ * Should be used by subclasses to know if they have been asked to stop scheduling.
+ */
+ protected volatile boolean running;
--- End diff --
scheduling might be more descriptive?
---