You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by zhuoliu <gi...@git.apache.org> on 2015/08/26 18:18:53 UTC

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

GitHub user zhuoliu opened a pull request:

    https://github.com/apache/storm/pull/700

    [STORM-886] Automatic Back Pressure (ABP)

    This new feature is aimed for automatic flow control through the topology DAG since different components may have unmatched tuple processing speed. Currently, the tuples may get dropped if the downstream components can not process as quickly, thereby causing a waste of network bandwidth and processing capability. In addition, it is difficult to tune the max.spout.pending parameter for best backpressure performance. Therefore, an automatic back pressure scheme is highly desirable.
    
    Recently, Heron proposed a form of back pressure that does not rely on acking or max spout pending. Instead spouts throttle not only when max.spout.pending is hit, but also if any bolt has gone over a high water mark in their input queue, and has not yet gone below a low water mark again. There is a lot of room for potential improvement here around control theory and having spouts only respond to downstream bolts backing up, but a simple bang-bang controller like this is a great start.
    
    Our ABP scheme implements a light-weight yet efficient back pressure scheme. It monitors certain queues in executors and exploits the callback schemes on ZooKeeper and disruptor queue for a fast-responding (in a push manner) flow control.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhuoliu/storm 886

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #700
    
----
commit fc2fcf41b8977b833d7f7f883783263e3b422c45
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-18T18:47:39Z

    Initial commit for YSTORM-1949 automatic backpressure

commit 3316a70b314474c900cf319b355f13f3567febfa
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-21T21:15:31Z

    Temp version that can do both throttle and back-to-normal, WordCountTopology changed, may revert it later

commit 5c5f34d4f63df08d69871b1b0d395a96768283e1
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-21T21:24:25Z

    re-enable the recurring of throttle check to zk in worker

commit 52615833e1b88ac55b91000687a4d9123faf9662
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-25T18:06:31Z

    Use ephemeral-node for worker backpressure

commit 21a314b3b19f79693ce1bb7257d42961266a0afa
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-25T21:04:51Z

    Make backpressure configurable

commit ae194073612477636651efc2668a4623a4785020
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-25T21:55:44Z

    use disruptor/notify-backpressure-checker

commit 7996116072e45b13267b86045cef6d777cf3fb04
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-25T22:35:14Z

    Remove debug message for daemons

commit fdd75728ce51ac52f48ad091db4a623ba1fd7f0c
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-26T02:26:10Z

    Minor cleanup

commit ae2ed251c79c640854f336ca4e1b13e3dd7aabe5
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-26T02:29:07Z

    Minor

commit 8c31ea6b081a21459d9dff7a0687fd6de9bfe8bb
Author: zhuol <zh...@yahoo-inc.com>
Date:   2015-08-26T16:04:48Z

    Decomment tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/700


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-146874511
  
    @rsltrifork That does sound like a very interesting approach.  I would love to see some numbers on how it would perform.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38021586
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -602,7 +607,15 @@
                         (log-message "Activating spout " component-id ":" (keys task-datas))
                         (fast-list-iter [^ISpout spout spouts] (.activate spout)))
                    
    -                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
    +                  (if (and
    +                        ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
    +                        @(:throttle-on (:worker executor-data))
    +                        suspend-time
    +                        (not= suspend-time 0))
    +                    (do 
    --- End diff --
    
    Can we move this conditional up to be part of the overflow-buffer/max-spout-pending check?  It feels like it fits better there, and that we don't want to output anything when back-pressure is on, instead of just slowing down how quickly we output.
    
    Also instead of logging something, can we look at having a metric instead that shows how may times we called paused for a given reason.  Logging is not going to really give us the picture we want and is going to be difficult to follow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-137588941
  
    Hi, Bobby @revans2 , all comments and concerns have been addressed. Ready for another round of review. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-141514572
  
    Thanks, Bobby @revans2 . I addressed all the comments. 
    Actually I have such examples already written and tested. See:
     https://github.com/zhuoliu/storm/blob/888/examples/storm-starter/src/jvm/storm/starter/WordCountTopology2.java
    I can check them in to master if needed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-135359036
  
    It appears that the spouts across all the workers are throttled if an executor queue fills up. For instance if there are multiple spouts, even the ones not causing the executor recv queue to fill would be throttled. Is this correct and if so desirable? 
    
    Wont it be ideal to trickle the back pressure up via the immediately preceding component(s) than slowing down the entire topology ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-139678312
  
    2. Test with a congested topology. 
    WordCountTopology2 (3 times more workers and executors than original WordCount; in every minute, for the wordcount bolt: in the first 30 second, there is a 50  ms sleep before each tuple emits"if (time/30000 %2 == 0) Thread.sleep(50);").
    
    Interestingly, without ABP, this 12 worker (87 executors) topology can not run successfully in Storm since workers frequently crash because overflowing of tuples in the bolt executors.
    And the ZK receive workload is 20.6 to 21 pack/second.
    
    On the contrast, with ABP enabled, this topology runs pretty well.
    And we see about 22.5 pack/sec receive workloads on Zookeeper nodes.
    
    This test shows the great advantage of Backpressure when dealing with topologies that may have congested or slow components. Since (1) ABP makes sure this topology can run successfully; (2) ABP causes small overheads to Zookeeper.
    
    @revans2 @d2r @knusbaum 
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38021285
  
    --- Diff: conf/defaults.yaml ---
    @@ -140,6 +140,14 @@ task.heartbeat.frequency.secs: 3
     task.refresh.poll.secs: 10
     task.credentials.poll.secs: 30
     
    +# now should be null by default
    +topology.backpressure.enable: true
    +backpressure.worker.high.watermark: 0.9
    +backpressure.worker.low.watermark: 0.4
    +backpressure.executor.high.watermark: 0.9
    +backpressure.executor.low.watermark: 0.4
    +backpressure.spout.suspend.time.ms: 100
    --- End diff --
    
    This seems way too high.  It is not that expensive to poll at a 1ms interval in the spout waiting for back-pressure to be turned off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-135145296
  
    This looks very interesting.  I would really like to see some unit tests, especially around the disruptor queue to show that the callback is working.  Perhaps we can also handle the corner cases for callback in DisruptorQueue itself instead of having special case code in other locations as a backup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38022357
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -114,12 +114,34 @@
       (fast-list-iter [[task tuple :as pair] tuple-batch]
         (.serialize serializer tuple)))
     
    +(defn- mk-backpressure-handler [executors]
    +  "make a handler that checks and updates worker's backpressure flag"
    +  (disruptor/backpressure-handler 
    +    (fn [worker]
    +      (let [storm-id (:storm-id worker)
    +            assignment-id (:assignment-id worker)
    +            port (:port worker)
    +            storm-cluster-state (:storm-cluster-state worker)]
    +        (if executors 
    +          (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
    +            (reset! (:backpressure worker) true)   ;; at least one executor has set backpressure
    +            (reset! (:backpressure worker) false))) ;; no executor has backpressure set
    +        ;; update the worker's backpressure flag to zookeeper here
    +        (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker))  
    +        ))))
    +
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        transfer-queue (:transfer-queue worker)
    --- End diff --
    
    Why did we drop the type hint?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38438290
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -114,12 +114,38 @@
       (fast-list-iter [[task tuple :as pair] tuple-batch]
         (.serialize serializer tuple)))
     
    +(defn- mk-backpressure-handler [executors]
    +  "make a handler that checks and updates worker's backpressure flag"
    +  (disruptor/backpressure-handler 
    +    (fn [worker]
    +      (let [storm-id (:storm-id worker)
    +            assignment-id (:assignment-id worker)
    +            port (:port worker)
    +            storm-cluster-state (:storm-cluster-state worker)
    +            prev-backpressure-flag @(:backpressure worker)]
    +        (doseq [ed executors] ;; Debug, TODO: delete
    +          (log-message "zliu executor" (.get-executor-id ed) " flag is " (.get-backpressure-flag ed)))
    --- End diff --
    
    Thanks for the reminder, I will remove all those debug messages soon after the unit test commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38152708
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -114,12 +114,34 @@
       (fast-list-iter [[task tuple :as pair] tuple-batch]
         (.serialize serializer tuple)))
     
    +(defn- mk-backpressure-handler [executors]
    +  "make a handler that checks and updates worker's backpressure flag"
    +  (disruptor/backpressure-handler 
    +    (fn [worker]
    +      (let [storm-id (:storm-id worker)
    +            assignment-id (:assignment-id worker)
    +            port (:port worker)
    +            storm-cluster-state (:storm-cluster-state worker)]
    +        (if executors 
    +          (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
    +            (reset! (:backpressure worker) true)   ;; at least one executor has set backpressure
    +            (reset! (:backpressure worker) false))) ;; no executor has backpressure set
    --- End diff --
    
    Can we change these two `if`s to a single `when`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38023281
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -114,12 +114,34 @@
       (fast-list-iter [[task tuple :as pair] tuple-batch]
         (.serialize serializer tuple)))
     
    +(defn- mk-backpressure-handler [executors]
    +  "make a handler that checks and updates worker's backpressure flag"
    +  (disruptor/backpressure-handler 
    +    (fn [worker]
    +      (let [storm-id (:storm-id worker)
    +            assignment-id (:assignment-id worker)
    +            port (:port worker)
    +            storm-cluster-state (:storm-cluster-state worker)]
    +        (if executors 
    +          (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
    +            (reset! (:backpressure worker) true)   ;; at least one executor has set backpressure
    +            (reset! (:backpressure worker) false))) ;; no executor has backpressure set
    +        ;; update the worker's backpressure flag to zookeeper here
    +        (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker))  
    --- End diff --
    
    I think we have an opportunity here to not put so much load on ZK.  We should be able to keep cached if we think back-pressure is on or not for this worker, and only update if we think it is changing, instead of reading the state each time and updating it, if it is different.  This is being called at least ever 100 ms, which is going to put more load on ZK then I like.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r39872683
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -35,6 +35,7 @@
     import java.util.HashMap;
     import java.util.Map;
     import backtype.storm.metric.api.IStatefulObject;
    +import org.jgrapht.graph.DirectedSubgraph;
    --- End diff --
    
    Where did this come from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by rsltrifork <gi...@git.apache.org>.
Github user rsltrifork commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-146779690
  
    Thanks for the quick reply, revans2.
    
    > How do you know that the bolt is waiting in a controlled manner?
    
    A Bolt sending to an external system can include a circuit breaker, so it can keep waiting while the CB is tripped, while regularly retrying sends to check if the recipient is up.
    
    While doing this, the Bolt could also tell Storm that processing of the current tuple is currently blocked, and that Storm should reset the tuple timeout for it (and subsequent inflight tuples):
                `collector.resetTupleTimeout(tuple);`
    
    > Even if you do know that how do you know that if it is waiting in a controlled manner that it has not lost track of a tuple?
    
    The bolt is continuously saying that it still has the tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-141544473
  
    @zhuoliu I looked over the code, and I ran some tests with a word count program that does not sleep, and there are not enough split sentence bolts.  This kept the workers from crashing, which I verified happened without it.  It was able to keep a throughput similar to setting a decent value for max spout pending, but with a larger latency.
    
    I am +1 for merging this in.  Great work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-135248186
  
    +1 Looks very exciting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r39871444
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -602,6 +640,7 @@
                         (log-message "Activating spout " component-id ":" (keys task-datas))
                         (fast-list-iter [^ISpout spout spouts] (.activate spout)))
                    
    +                    ;; (log-message "Spout executor " (:executor-id executor-data) " found throttle-on, now suspends sending tuples")
    --- End diff --
    
    Can we remove this line? it looks like it is no longer needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-141499766
  
    For the most part things look really good.  I would also love to see something added to Examples.  Like a Word Count that just goes as fast as it can.  That way we can see before, with acking disabled it crashes, after with acking disabled it stays up and can run really well.  I can help with code for that if you want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by redsanket <gi...@git.apache.org>.
Github user redsanket commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38437006
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -114,12 +114,38 @@
       (fast-list-iter [[task tuple :as pair] tuple-batch]
         (.serialize serializer tuple)))
     
    +(defn- mk-backpressure-handler [executors]
    +  "make a handler that checks and updates worker's backpressure flag"
    +  (disruptor/backpressure-handler 
    +    (fn [worker]
    +      (let [storm-id (:storm-id worker)
    +            assignment-id (:assignment-id worker)
    +            port (:port worker)
    +            storm-cluster-state (:storm-cluster-state worker)
    +            prev-backpressure-flag @(:backpressure worker)]
    +        (doseq [ed executors] ;; Debug, TODO: delete
    +          (log-message "zliu executor" (.get-executor-id ed) " flag is " (.get-backpressure-flag ed)))
    --- End diff --
    
    zliu in log message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-144723285
  
    @rsltrifork, No this does not solve that issue.  The timeout is still a hard coded value.  The backpressure just means that the spout will not be outputting new values.  Old tuples can still time out and be replayed.  The problem here is the halting problem.  How do you know that the bolt is waiting in a controlled manner?  Even if you do know that how do you know that if it is waiting in a controlled manner that it has not lost track of a tuple?  You have to be able to predict the future to truly solve this problem.  A hard coded timeout is a simple solution.  There have been a few other proposals to adjust the timeout dynamically, but that all have potentially serious limitations compared to a static timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r39872538
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -138,7 +150,21 @@ private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
             //TODO: only set this if the consumer cursor has changed?
             _consumer.set(cursor);
         }
    -    
    +
    +    public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
    +        this._cb = cb;
    +    }
    +
    +    static public void notifyBackpressureChecker(Object trigger) {
    --- End diff --
    
    I don't think this code really belongs here.  It would make more since to move it to the WorkerBackpressureThread itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-135577332
  
    Addressed comments:
    a. Removed the config of suspend time, reuse the empty emit and put the judgement together with max.spout.pending
    b. Separate functions for credentials and throttle checks in schedule-recurring thread
    c. Update worker flag to ZK only when it has changed (for reducing ZK load)
    d. Other comments like type hint and And.
    
    
    To continue work on the other three comments:
    a. Add metrics for executor's suspending times for throttle-on, inactive, spout.spending etc.
    b. Concern on the possible corner cases since I put the backpressure checks in tuple-action-fn.
    c. Unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r39872212
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -1024,6 +1024,48 @@
     
     
         /**
    +     * Whether to enable backpressure in for a certain topology
    +     */
    +    public static final String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable";
    +    public static final Object TOPOLOGY_BACKPRESSURE_ENABLE_SCHEMA = Boolean.class;
    +
    +    /**
    +     * This signifies the tuple congestion in a worker's out-going queue.
    +     * When the used ratio of a worker's outgoing queue is higher than the high watermark,
    +     * the backpressure scheme, if enabled, should slow down the tuple sending speed of
    +     * the spouts until reaching the low watermark.
    +     */
    +    public static final String BACKPRESSURE_WORKER_HIGH_WATERMARK="backpressure.worker.high.watermark";
    +    public static final Object BACKPRESSURE_WORKER_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
    +
    +    /**
    +     * This signifies a state that a worker has left the congestion.
    +     * If the used ratio of a worker's outgoing queue is lower than the low watermark,
    +     * it notifies the worker to check whether all its executors have also left congestion,
    +     * if yes, it will unset the worker's backpressure flag on the Zookeeper
    +     */
    +    public static final String BACKPRESSURE_WORKER_LOW_WATERMARK="backpressure.worker.low.watermark";
    +    public static final Object BACKPRESSURE_WORKER_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
    +
    +    /**
    +     * This signifies the tuple congestion in an executor's receiving queue.
    +     * When the used ratio of an executor's receiving queue is higher than the high watermark,
    +     * the backpressure scheme, if enabled, should slow down the tuple sending speed of
    +     * the spouts until reaching the low watermark.
    +     */
    +    public static final String BACKPRESSURE_EXECUTOR_HIGH_WATERMARK="backpressure.executor.high.watermark";
    +    public static final Object BACKPRESSURE_EXECUTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
    +
    +    /**
    +     * This signifies a state that an executor has left the congestion.
    +     * If the used ratio of an execuotr's receive queue is lower than the low watermark,
    +     * it may notify the worker to check whether all its executors have also left congestion,
    +     * if yes, the worker's backpressure flag will be unset on the Zookeeper
    +     */
    +    public static final String BACKPRESSURE_EXECUTOR_LOW_WATERMARK="backpressure.executor.low.watermark";
    +    public static final Object BACKPRESSURE_EXECUTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
    --- End diff --
    
    Looking through the code it looks like these two configs are never used.
    
    BACKPRESSURE_EXECUTOR_HIGH_WATERMARK and BACKPRESSURE_EXECUTOR_LOW_WATERMARK.  I don't see a reason to have different configs for different queues.  Perhaps we can just remove these configs. and make the description for the other configs more generic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38126936
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -811,9 +841,18 @@
             (setup-metrics! executor-data)
     
             (let [receive-queue (:receive-queue executor-data)
    -              event-handler (mk-task-receiver executor-data tuple-action-fn)]
    +              event-handler (mk-task-receiver executor-data tuple-action-fn)
    +              high-watermark ((:storm-conf executor-data) BACKPRESSURE-EXECUTOR-HIGH-WATERMARK)
    +              low-watermark  ((:storm-conf executor-data) BACKPRESSURE-EXECUTOR-LOW-WATERMARK)
    +              receive-queue-size ((:storm-conf executor-data) TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
    +              high-watermark (int (* high-watermark receive-queue-size))
    +              low-watermark (int (* low-watermark receive-queue-size))]
               (disruptor/consumer-started! receive-queue)
               (fn []            
    +            ;; this additional check is necessary because rec-q can be 0 while the executor backpressure flag is forever set
    +            (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) (< (.population receive-queue) low-watermark) @(:backpressure executor-data))
    --- End diff --
    
    Can we move the `and` form's second condition to a new line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by acommuni <gi...@git.apache.org>.
Github user acommuni commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-195988246
  
    In case external system is down, it could be interesting to pause the Spout for an amount of time. The state "OPEN" of the CB could be directly linked to the spout. I don't know if the implementation of the back pressure is manageable through public API. But it would be a nice enhancement to be able to implement Circuit Breaker algorithm with bolt and spout. 
    
    Error count and timeout generated by request to external system can be detected by the bolt using those external component and could be propagated to the spout. In case the amount of consecutive error reaches a defined value the spout could be paused by the bolt for an amout of time (CLOSE to OPEN state). After sleeping the spout is considered in HALF OPEN state. If new error occurs spout sleeps for another amount of time else it goes to CLOSE state and continue to read new tuples.
    
    Being able to use Circuit breaker framework like Hystrix could be a nice enhancement of the back pressure feature.
    
    https://github.com/Netflix/Hystrix
    https://github.com/Netflix/Hystrix/wiki/How-it-Works
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38128803
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -137,9 +159,14 @@
                             (.add remote (TaskMessage. task (.serialize serializer tuple)))
                             (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
                          ))))
    -                (local-transfer local)
    -                (disruptor/publish transfer-queue remoteMap)
    -              ))]
    +              ;; each executor itself will do the self setting for the worker's backpressure tag
    +              ;; however, when the backpressure is set, the worker still need to check whether all the executors' flag has cleared to unset worker's backpressure
    +              (if (and ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) (> (.population transfer-queue) high-watermark))
    --- End diff --
    
    Here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-135422890
  
    @arunmahadevan It is the intention to throttle the entire topology, all spouts.  This is what Heron does and is intended to be a last resort, which is better then nothing, but not a truly final solution. The reason for this is that from the level of a single queue it is very difficult to know what is causing that congestion. STORM-907 is intended as a follow on that will analyze the topology, determine where there are loops and provide more of true backpressure.  But in the case of a loop, and storm does support loops, you have no way to determine which upstream part is causing slowness.  And in fact it could be the bolt itself, and it needs to increase it's parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-139676554
  
    Initial tests on 5 node openstack cluster (1 zk, 1 nimbus, 3 supervisors). 
    1. Test a normal topology (no congestion happen)
    WordCountTopology3 (3 times more workers and executors than original WordCount).
    
    Without topology running, the zookeeper node's received packets is 5 pack/second; 
    without BP, the zk load is 20.1 pack/second;
    with BP, the zk load is 20.9 pack/second.
    This makes sense: if a topology is running fluently (no congestion happens), the ZK will almost never be accessed by Backpressure procedures. "almost" is because we have added an additional topo-backpressure recurring checking with credential thread (just for dealing with the very rare case that ZK callback fails to proceed), which reads from ZK every 30 seconds.
    So 12 workers cause 12/30 = 0.4 pack / second overheads to ZK.
    
    This shows that Backpressure will have minimal additional overheads for any non-congested topologies.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-141491743
  
    @zhuoliu looks like you missed adding in a file, We also need to upmerge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38023405
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -510,15 +554,21 @@
                      ))
                  )
             credentials (atom initial-credentials)
    -        check-credentials-changed (fn []
    -                                    (let [new-creds (.credentials (:storm-cluster-state worker) storm-id nil)]
    -                                      (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
    -                                        (AuthUtils/updateSubject subject auto-creds new-creds)
    -                                        (dofor [e @executors] (.credentials-changed e new-creds))
    -                                        (reset! credentials new-creds))))
    -      ]
    -    (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
    -    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS) check-credentials-changed)
    +        check-credentials-throttle-changed (fn []
    +                                             (let [callback (fn cb [& ignored] 
    +                                                              (let [throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id cb)]
    +                                                                (reset! (:throttle-on worker) throttle-on)))
    +                                                   new-throttle-on (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
    +                                                                     (.topology-backpressure (:storm-cluster-state worker) storm-id callback) nil)
    +                                                   new-creds (.credentials (:storm-cluster-state worker) storm-id nil)]
    +                                               (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
    +                                                 (reset! (:throttle-on worker) new-throttle-on))
    +                                               (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
    +                                                 (AuthUtils/updateSubject subject auto-creds new-creds)
    +                                                 (dofor [e @executors] (.credentials-changed e new-creds))
    +                                                 (reset! credentials new-creds))))]
    +    (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-throttle-changed)))
    --- End diff --
    
    I think I would prefer to have two functions instead of just one.  Both can be scheduled on the same timer, but keeping them separate seems cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by rsltrifork <gi...@git.apache.org>.
Github user rsltrifork commented on the pull request:

    https://github.com/apache/storm/pull/700#issuecomment-144703338
  
    Does this solve the problem of tuple timeout, when a bolt is completely stalled waiting for an external component to get back up?
    I believe waiting for too long in a bolt triggers the tuple timeout, which causes the Spout to reemit, which is usually not what we want, when a bolt is waiting in a controlled manner before it can resume computation. 
    
    Ideally, the tuple timeout should be used as a last resort to detect that internal storm components don't respond. And back pressure should ensure that the Spout doesn't reemit to temporarily busy/blocked bolts - regardless of timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/700#discussion_r38022111
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -671,7 +689,19 @@
                                         user-context (:user-context task-data)
                                         sampler? (sampler)
                                         execute-sampler? (execute-sampler)
    -                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
    +                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
    +                                    receive-queue (:receive-queue executor-data)]
    +                                (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) 
    +                                         (> (.population receive-queue) high-watermark) 
    +                                         (not @(:backpressure executor-data)))
    +                                  (do (reset! (:backpressure executor-data) true)
    +                                      (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
    +                                      (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data)))))
    +                                (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) 
    +                                         (< (.population receive-queue) low-watermark) 
    +                                         @(:backpressure executor-data))
    +                                  (do (reset! (:backpressure executor-data) false)
    +                                      (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data)))))
    --- End diff --
    
    I am a bit confused.  Why are we doing this as part of the metrics tick?  What does that have to do with back pressure at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---