You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2018/04/10 22:04:14 UTC

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2630

    STORM-3024: Allow for scheduling to happen in the background.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-3024

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2630.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2630
    
----
commit 4a9d0e2a5399647bf541ec16956472d2d795b4b1
Author: Robert (Bobby) Evans <ev...@...>
Date:   2018-04-10T18:34:20Z

    STORM-3024: Allow for scheduling to happen in the background.

----


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 closed the pull request at:

    https://github.com/apache/storm/pull/2630


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180804408
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
             u.markTopoUnsuccess(td);
         }
     
    +    private void cancelAllPendingClusterStateChanged() {
    +        if (!schedulingInBackground.isEmpty()) {
    +            LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
    +            for (SchedulingPending sp : schedulingInBackground.values()) {
    +                sp.cancel();
    +            }
    +            schedulingInBackground.clear();
    +        }
    +    }
    +
         private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
                                       List<TopologyDetails> orderedTopologies) {
    +        LOG.debug("Scheduling {}", td.getId());
    +        SchedulingPending sp = schedulingInBackground.get(td.getId());
    --- End diff --
    
    lot of code, i would move to method   sp = getOrCreateSp() or something.


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180801771
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
             u.markTopoUnsuccess(td);
         }
     
    +    private void cancelAllPendingClusterStateChanged() {
    +        if (!schedulingInBackground.isEmpty()) {
    +            LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
    +            for (SchedulingPending sp : schedulingInBackground.values()) {
    +                sp.cancel();
    +            }
    +            schedulingInBackground.clear();
    +        }
    +    }
    +
         private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter,
                                       List<TopologyDetails> orderedTopologies) {
    +        LOG.debug("Scheduling {}", td.getId());
    +        SchedulingPending sp = schedulingInBackground.get(td.getId());
    +        if (sp == null) {
    +            IStrategy rasStrategy;
    +            String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    +            try {
    +                String strategy = strategyConf;
    +                if (strategy.startsWith("backtype.storm")) {
    +                    // Storm supports to launch workers of older version.
    +                    // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name.
    +                    strategy = strategy.replace("backtype.storm", "org.apache.storm");
    +                    LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
    +                }
    +                rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
    +                rasStrategy.prepare(conf);
    +            } catch (DisallowedStrategyException e) {
    +                markFailedTopology(topologySubmitter, cluster, td,
    +                    "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                        + " is not an allowed strategy. Please make sure your "
    +                        + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                        + " config is one of the allowed strategies: "
    +                        + e.getAllowedStrategies(), e);
    +                return;
    +            } catch (RuntimeException e) {
    +                markFailedTopology(topologySubmitter, cluster, td,
    +                    "Unsuccessful in scheduling - failed to create instance of topology strategy "
    +                        + strategyConf
    +                        + ". Please check logs for details", e);
    +                return;
    +            }
    +
    +            sp = new SchedulingPending(rasStrategy, 0);
    +        }
    +
             //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
             RAS_Nodes nodes = new RAS_Nodes(workingState);
    -        IStrategy rasStrategy = null;
    -        String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    -        try {
    -            String strategy = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    -            if (strategy.startsWith("backtype.storm")) {
    -                // Storm supports to launch workers of older version.
    -                // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name.
    -                strategy = strategy.replace("backtype.storm", "org.apache.storm");
    -                LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
    -            }
    -            rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
    -            rasStrategy.prepare(conf);
    -        } catch (DisallowedStrategyException e) {
    -            markFailedTopology(topologySubmitter, cluster, td,
    -                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your "
    -                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: "
    -                    + e.getAllowedStrategies(), e);
    -            return;
    -        } catch (RuntimeException e) {
    -            markFailedTopology(topologySubmitter, cluster, td,
    -                "Unsuccessful in scheduling - failed to create instance of topology strategy "
    -                    + strategyConf
    -                    + ". Please check logs for details", e);
    -            return;
    -        }
     
    -        for (int i = 0; i < maxSchedulingAttempts; i++) {
    +        for (int i = sp.getAttempt(); i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
                 try {
    -                SchedulingResult result = rasStrategy.schedule(toSchedule, td);
    +                SchedulingResult result;
    +                Future<SchedulingResult> schedulingFuture = sp.scheduleIfNeeded(background, toSchedule, td);
    +                try {
    +                    result = schedulingFuture.get(schedulingForegroundTimeoutSeconds, TimeUnit.SECONDS);
    +                    sp.resetFuture();
    +                } catch (TimeoutException te) {
    +                    long elapsedTimeSecs = (Time.currentTimeMillis() - sp.getStartTime())/1000;
    +                    if (elapsedTimeSecs >= schedulingBackgroundTimeoutSeconds) {
    --- End diff --
    
    what would be wrong with just letting this remain running?


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180803721
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -60,15 +122,33 @@ public void prepare(Map<String, Object> conf) {
             configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
             maxSchedulingAttempts = ObjectReader.getInt(
                 conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5);
    +        schedulingBackgroundTimeoutSeconds = ObjectReader.getInt(
    +            conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 600);
    +        schedulingForegroundTimeoutSeconds = ObjectReader.getInt(
    +            conf.get(DaemonConfig.SCHEDULING_FOREGROUND_TIMEOUT_SECONDS_PER_TOPOLOGY), 10);
    +        background =
    +            Executors.newCachedThreadPool(r -> new Thread(r, "RAS_WORKER_" + THREAD_COUNTER.getAndIncrement()));
         }
     
         @Override
         public Map<String, Map<String, Double>> config() {
    -        return (Map) getUserResourcePools();
    +        return getUserResourcePools();
         }
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    +        //Cancel any pending scheduling attempts for a topology that is no longer running.
    --- End diff --
    
    call cancelAllPendingClusterStateChanged()?  Rename it to cancelAllPending() or something?


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180805730
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java ---
    @@ -65,6 +65,9 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
             final List<ObjectResources> sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes);
     
             for (ExecutorDetails exec : orderedExecutors) {
    +            if (!running) {
    --- End diff --
    
    this is just to force a quicker cancellation of scheduling?


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180615319
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---
    @@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St
             u.markTopoUnsuccess(td);
         }
     
    +    private void cancelAllPendingClusterStateChanged() {
    +        if (!schedulingInBackground.isEmpty()) {
    +            LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet());
    --- End diff --
    
    Is it "Canceling scheduling of {} _**because of**_ cluster state changed"?


---

[GitHub] storm pull request #2630: STORM-3024: Allow for scheduling to happen in the ...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180805036
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -49,20 +49,30 @@
         protected Cluster cluster;
         private Map<String, List<String>> networkTopography;
         protected RAS_Nodes nodes;
    +    /**
    +     * Should be used by subclasses to know if they have been asked to stop scheduling.
    +     */
    +    protected volatile boolean running;
    --- End diff --
    
    scheduling might be more descriptive?


---