You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Ethanlm <gi...@git.apache.org> on 2018/04/13 03:19:04 UTC

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

GitHub user Ethanlm opened a pull request:

    https://github.com/apache/storm/pull/2634

    [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS on RAS cluster

    https://issues.apache.org/jira/browse/STORM-3021
    
    The Resource Aware Scheduler doesn't honor `Config.TOPOLOGY_WORKERS`. So we need to fix the code where it uses `Config.TOPOLOGY_WORKERS` incorrectly.  Also fix docs/javadocs to avoid confusion. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ethanlm/storm STORM-3021

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2634
    
----
commit 032431cf448a589cfa016fba775921e5c06f89e4
Author: Ethan Li <et...@...>
Date:   2018-04-13T03:10:25Z

    [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS on RAS cluster

----


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182552073
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    Perhaps we can consolidate these set difference calls to count single/multiworker mode into a single function ? 
    - https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java#L230 



---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182549691
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -3032,9 +3019,19 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
                 // if the other config does not have it set.
                 topology = normalizeTopology(totalConf, topology);
     
    -            //set the number of acker executors;
    -            totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
    -            LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +            // if the Resource Aware Scheduler is used,
    +            // we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers.
    +            if (ServerUtils.isRAS(conf)) {
    +                int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
    +                int numAckerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS), estimatedNumWorker);
    +                int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker);
    +
    +                totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, numAckerExecs);
    --- End diff --
    
    Is this a case where we are overriding the acker & eventLogger count settings that are set by user ?


---

[GitHub] storm issue #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2634
  
    The failed test should be unrelated


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182757546
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    About the `set difference calls`, I thought these two are different. The first one https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java#L230 is to check if there is any outbound task not in local worker. This doesn't necessarily mean the topology has only one worker. 
    The second (in setupBackPressureCheckTimer) is to check if all the tasks are in local which means there is only one worker.
    
    



---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182750384
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -3032,9 +3019,19 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
                 // if the other config does not have it set.
                 topology = normalizeTopology(totalConf, topology);
     
    -            //set the number of acker executors;
    -            totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
    -            LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +            // if the Resource Aware Scheduler is used,
    +            // we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers.
    +            if (ServerUtils.isRAS(conf)) {
    +                int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
    +                int numAckerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS), estimatedNumWorker);
    +                int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker);
    +
    +                totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, numAckerExecs);
    --- End diff --
    
    No we are not overriding the user setting.  We are overriding the default value.


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r181472816
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    --- End diff --
    
    `topologyConf`  used in this function 


---

[GitHub] storm issue #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2634
  
    @revans2  @roshannaik  Could you please take a look if you get a chance as we discussed about this in https://issues.apache.org/jira/browse/STORM-3021


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182545551
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -184,6 +184,10 @@ The user can set some default configurations for the Resource Aware Scheduler in
         topology.worker.max.heap.size.mb: 768.0
     ```
     
    +### Warning
    +
    +The number of workers will be dynamically calculated by the Resource Aware Scheduler. The `Config.TOPOLOGY_WORKERS` will not be honored. 
    --- End diff --
    
    May I suggest a minor rewording:  "If Resource Aware Scheduling is enabled, it will dynamically calculate the number of workers and the `topology.workers` setting is ignored."


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by erikdw <gi...@git.apache.org>.
Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r181535091
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -372,9 +374,10 @@
         /**
          * How many executors to spawn for ackers.
          *
    -     * <p>By not setting this variable or setting it as null, Storm will set the number of acker executors
    -     * to be equal to the number of workers configured for this topology. If this variable is set to 0,
    -     * then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.</p>
    +     * <p>By not setting this variable or setting it as null, Storm will set the number of acker executor to be equal to
    --- End diff --
    
    nit: please restore pluralization of `executor`.  i.e., `number of acker executors`.


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r181480542
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    --- End diff --
    
    My bad.  


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182755817
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -3032,9 +3019,19 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
                 // if the other config does not have it set.
                 topology = normalizeTopology(totalConf, topology);
     
    -            //set the number of acker executors;
    -            totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
    -            LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +            // if the Resource Aware Scheduler is used,
    +            // we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers.
    +            if (ServerUtils.isRAS(conf)) {
    +                int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
    +                int numAckerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS), estimatedNumWorker);
    +                int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker);
    +
    +                totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, numAckerExecs);
    --- End diff --
    
    +1


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r181427545
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    --- End diff --
    
    remove unused topologyConf


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r183051697
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    `Sets.difference()` is already a named function and it's almost one line of code. I think it's OK to just use it in this way?


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182909146
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    Thanks for clarifications. IMO, thats a nice (eventhough small) improvement to disable interworker when there are no outbound tasks... and not just for 1worker mode. 
    
    Minor request: would be nice to have named functions for those two set difference calls. I suspect they will start seeing more usage soon.
    
    looks good thanks for taking care of this.



---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182628537
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    Some other places that may need similar fixes:
    
    - https://github.com/apache/storm/blob/master/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java#L109
    
    - https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L363-L364  
    
    - https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L276 
    
    
    may want to check for all usages of that setting in code base.


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182758538
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
    https://github.com/apache/storm/blob/master/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java#L109 This is fine because the code also gets the num of workers from topologyInfo.
    
    https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L363-L364 this is also fine because we override the default value of `TOPOLOGY_EVENTLOGGER_EXECUTORS` when we submit the topology if it's on RAS.
    Similar to https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L276
    
    



---

[GitHub] storm issue #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2634
  
    @roshannaik I addressed your comments. Thanks for review. If it looks good, I will squash the commits and wait for merge


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182747281
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -184,6 +184,10 @@ The user can set some default configurations for the Resource Aware Scheduler in
         topology.worker.max.heap.size.mb: 768.0
     ```
     
    +### Warning
    +
    +The number of workers will be dynamically calculated by the Resource Aware Scheduler. The `Config.TOPOLOGY_WORKERS` will not be honored. 
    --- End diff --
    
    +1 for the comment from @roshannaik 


---

[GitHub] storm issue #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2634
  
    rebased.


---

[GitHub] storm issue #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_WORKERS ...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2634
  
    Not sure why the build failed. The build succeeded on my repo: https://travis-ci.org/Ethanlm/storm/builds/372073451


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2634


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r183200719
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -305,8 +305,9 @@ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final
         }
     
         private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    -        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    -        if (workerCount <= 1) {
    +        Set<Integer> nonLocalTasks = Sets.difference(workerState.getTaskToComponent().keySet(),
    --- End diff --
    
     I meant ... would be nice to have functions like  isSingleWorkerMode() and hasOutboundTasks() that hide the detail of whatever mechanics are being employed to figure it out. I suspect they these functions will see more usage.


---

[GitHub] storm pull request #2634: [STORM-3021] Fix wrong usages of Config.TOPOLOGY_W...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2634#discussion_r182584203
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -3032,9 +3019,19 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
                 // if the other config does not have it set.
                 topology = normalizeTopology(totalConf, topology);
     
    -            //set the number of acker executors;
    -            totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
    -            LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +            // if the Resource Aware Scheduler is used,
    +            // we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers.
    +            if (ServerUtils.isRAS(conf)) {
    +                int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
    +                int numAckerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS), estimatedNumWorker);
    +                int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker);
    +
    +                totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, numAckerExecs);
    --- End diff --
    
    Some other places that may need similar fixes:
    
    - https://github.com/apache/storm/blob/master/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java#L109
    
    - https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L363-L364  
    
    - https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L276 
    
    
    may want to check for all usages of that setting in code base.


---