You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Ethanlm <gi...@git.apache.org> on 2017/09/21 18:43:42 UTC

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

GitHub user Ethanlm opened a pull request:

    https://github.com/apache/storm/pull/2338

    [STORM-2744] add in restart timeout for backpressure

    [JIRA-STORM-2744](https://issues.apache.org/jira/browse/STORM-2744)
    
    Backpressure mechanism relies on zookeeper to transfer signals. 
    
    If at some point, the topology wants to turn off the throttle but zookeeper failed to delete the backpressure node, then the throttle will be on. And in that case, it's very likely that the znode will never be deleted and thus the throttle will always be on. The spouts will stop indefinitely. 
    
    We want to add a restart timeout so that even the above case happened, the spouts can continue to work. 
     

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ethanlm/storm STORM-2744

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2338.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2338
    
----
commit bebc09cfb744f760bf14395513145165cd214e77
Author: Ethan Li <et...@gmail.com>
Date:   2017-09-20T14:33:29Z

    [STORM-2744] add in restart timeout for backpressure

----


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140341190
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,32 +424,35 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    +     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
    +     * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
          * 
          * @param stormId
          * @param node
          * @param port
    -     * @param on
    +     * @param timestamp
          */
         @Override
    -    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
    +    public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
             String path = ClusterUtils.backpressurePath(stormId, node, port);
             boolean existed = stateStorage.node_exists(path, false);
             if (existed) {
    -            if (on == false)
    +            if (timestamp == 0) {
                     stateStorage.delete_node(path);
    -
    +            }
    --- End diff --
    
    Shouldn't we update the timestamp if timestamp != 0 here?


---

[GitHub] storm issue #2338: [STORM-2744] add in restart timeout for backpressure

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2338
  
    @govind-menon @srdo are you OK with the changes?


---

[GitHub] storm issue #2338: [STORM-2744] add in restart timeout for backpressure

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2338
  
    I did some simulation test.
    
    The command I ran is:
    ```
    storm jar /tmp/storm-loadgen-2.0.0-SNAPSHOT.jar org.apache.storm.loadgen.ThroughputVsLatency --rate 20000 --counters  12 --splitters 6 --spouts 3 -t 60 -c topology.workers=3
    ```
    
    **The behavior before the code change:**
    
    At about timestamp 241, I created a znode under backpressure/topology-id manually. Quickly the rate (tuple/s) dropped and became 0. At about timestamp 391, I deleted the znode manually. The rate increased and the spouts were catching up. This is the expected behavior of original storm.
    
    ![image](https://user-images.githubusercontent.com/14900612/30712889-234b3bbe-9ed3-11e7-97ab-01c5dffc1f2a.png)
    
    
    
    **The behavior after the code change:**
    
    At about timestamp 211, I created a znode (with timestamp set to `System.currentTimeMillis()`) under backpressure/topology-id. The rate dropped quickly. Then after 30s, the spouts were catching up. Even though the znode I created still exists in zookeeper, it's no long valid because its timestamp is out of date.
    
    ![image](https://user-images.githubusercontent.com/14900612/30713002-8dcabbe0-9ed3-11e7-89c7-c88c5dc055c5.png)
    
    
    
    
    
    
    
    



---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140339271
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,32 +424,35 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    +     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
    --- End diff --
    
    Nit: What are the question marks after 0 for?


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2338


---

[GitHub] storm issue #2338: [STORM-2744] add in restart timeout for backpressure

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2338
  
    @govind-menon @srdo  I think I have address all your comments. I tested it with different configs. Could you please take another look when you get a chance? Thanks


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140345783
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,32 +424,35 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    +     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
    --- End diff --
    
    The question mark on the `on` in the old comment appears to be left-over from the clojure translation where the on argument was spelled `on?`.
    
    It should go away in any new comment.


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140339922
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -459,14 +463,15 @@ public boolean topologyBackpressure(String stormId, Runnable callback) {
                 backPressureCallback.put(stormId, callback);
             }
             String path = ClusterUtils.backpressureStormRoot(stormId);
    -        List<String> childrens = null;
    +        long mostRecentTimestamp = 0;
             if(stateStorage.node_exists(path, false)) {
    -            childrens = stateStorage.get_children(path, callback != null);
    -        } else {
    -            childrens = new ArrayList<>();
    +            List<String> children = stateStorage.get_children(path, callback != null);
    +            mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
    --- End diff --
    
    Please put a newline for each part of the stream pipeline, it gets a little hard to read in the second line here.


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140519078
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,50 +424,63 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    -     * 
    -     * @param stormId
    -     * @param node
    -     * @param port
    -     * @param on
    +     * If znode exists and timestamp is 0, delete;
    +     * if exists and timestamp is larger than 0, update the timestamp;
    +     * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
    +     * if not exists and timestamp is 0, do nothing.
    +     * @param stormId The topology Id
    +     * @param node The node id
    +     * @param port The port number
    +     * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
          */
         @Override
    -    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
    +    public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
             String path = ClusterUtils.backpressurePath(stormId, node, port);
             boolean existed = stateStorage.node_exists(path, false);
             if (existed) {
    -            if (on == false)
    +            if (timestamp == 0) {
    --- End diff --
    
    nit: Can we make this `<= 0` just as defensive programming?


---

[GitHub] storm issue #2338: [STORM-2744] add in restart timeout for backpressure

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2338
  
    +1, thanks @Ethanlm.


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140341935
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,32 +424,35 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    +     * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing;
    +     * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing;
          * 
          * @param stormId
          * @param node
          * @param port
    -     * @param on
    +     * @param timestamp
    --- End diff --
    
    It might be nicer to split this method into a create and delete variant, but we should at least put in the javadoc that timestamp 0 means "turn off"


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140340286
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -459,14 +463,15 @@ public boolean topologyBackpressure(String stormId, Runnable callback) {
                 backPressureCallback.put(stormId, callback);
             }
             String path = ClusterUtils.backpressureStormRoot(stormId);
    -        List<String> childrens = null;
    +        long mostRecentTimestamp = 0;
             if(stateStorage.node_exists(path, false)) {
    -            childrens = stateStorage.get_children(path, callback != null);
    -        } else {
    -            childrens = new ArrayList<>();
    +            List<String> children = stateStorage.get_children(path, callback != null);
    +            mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
    +                    .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0);
             }
    -        return childrens.size() > 0;
    -
    +        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < 30000);
    --- End diff --
    
    I agree with @govind-menon, it would be good to make some constants (or config if necessary) for these.


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by govind-menon <gi...@git.apache.org>.
Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140337615
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -406,25 +406,36 @@ private WorkerBackpressureCallback mkBackpressureHandler() {
             final List<IRunningExecutor> executors = executorsAtom.get();
             return new WorkerBackpressureCallback() {
                 @Override public void onEvent(Object obj) {
    -                String topologyId = workerState.topologyId;
    -                String assignmentId = workerState.assignmentId;
    -                int port = workerState.port;
    -                IStormClusterState stormClusterState = workerState.stormClusterState;
    -                boolean prevBackpressureFlag = workerState.backpressure.get();
    -                boolean currBackpressureFlag = prevBackpressureFlag;
                     if (null != executors) {
    -                    currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
    -                        .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
    -                }
    +                    String topologyId = workerState.topologyId;
    +                    String assignmentId = workerState.assignmentId;
    +                    int port = workerState.port;
    +                    IStormClusterState stormClusterState = workerState.stormClusterState;
    +                    long prevBackpressureTimestamp = workerState.backpressure.get();
    +                    long currTimestamp = System.currentTimeMillis();
    +                    long currBackpressureTimestamp = 0;
    +                    // the backpressure flag is true if at least one of the disruptor queues has throttle-on
    +                    boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
    +                            .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
    +
    +                    if (backpressureFlag) {
    +                        // update the backpressure timestamp every 15 seconds
    +                        if ((currTimestamp - prevBackpressureTimestamp) > 15000) {
    --- End diff --
    
    Could we make this configurable? If not a constant somewhere


---

[GitHub] storm pull request #2338: [STORM-2744] add in restart timeout for backpressu...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140519416
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,50 +424,63 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
    -     * 
    -     * @param stormId
    -     * @param node
    -     * @param port
    -     * @param on
    +     * If znode exists and timestamp is 0, delete;
    +     * if exists and timestamp is larger than 0, update the timestamp;
    +     * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
    +     * if not exists and timestamp is 0, do nothing.
    +     * @param stormId The topology Id
    +     * @param node The node id
    +     * @param port The port number
    +     * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure
          */
         @Override
    -    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
    +    public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
             String path = ClusterUtils.backpressurePath(stormId, node, port);
             boolean existed = stateStorage.node_exists(path, false);
             if (existed) {
    -            if (on == false)
    +            if (timestamp == 0) {
                     stateStorage.delete_node(path);
    -
    +            } else {
    +                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
    +                stateStorage.set_data(path, data, acls);
    +            }
             } else {
    -            if (on == true) {
    -                stateStorage.set_ephemeral_node(path, null, acls);
    +            if (timestamp > 0) {
    +                byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
    +                stateStorage.set_ephemeral_node(path, data, acls);
                 }
             }
         }
     
         /**
          * Check whether a topology is in throttle-on status or not:
          * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
    -     * 
    -     * @param stormId
    -     * @param callback
    -     * @return
    +     * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
    +     * This will prevent the spouts from getting stuck indefinitely if something wrong happens.
    +     * @param stormId The topology Id
    +     * @param timeoutMs How long until the backpressure znode is invalid.
    +     * @param callback The callback function
    +     * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
          */
         @Override
    -    public boolean topologyBackpressure(String stormId, Runnable callback) {
    +    public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) {
             if (callback != null) {
                 backPressureCallback.put(stormId, callback);
             }
             String path = ClusterUtils.backpressureStormRoot(stormId);
    -        List<String> childrens = null;
    +        long mostRecentTimestamp = 0;
             if(stateStorage.node_exists(path, false)) {
    -            childrens = stateStorage.get_children(path, callback != null);
    -        } else {
    -            childrens = new ArrayList<>();
    -        }
    -        return childrens.size() > 0;
    -
    +            List<String> children = stateStorage.get_children(path, callback != null);
    +            mostRecentTimestamp = children.stream()
    +                    .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
    +                    .filter(data -> data != null)
    +                    .mapToLong(data -> ByteBuffer.wrap(data).getLong())
    +                    .max()
    +                    .orElse(0);
    +        }
    +        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
    --- End diff --
    
    nit extra space after =


---

[GitHub] storm issue #2338: [STORM-2744] add in restart timeout for backpressure

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2338
  
    @govind-menon @srdo @knusbaum  Thanks for the review. Will address them soon


---