You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Shyam Rajendran (JIRA)" <ji...@apache.org> on 2015/07/10 16:23:05 UTC

[jira] [Comment Edited] (STORM-67) Provide API for spouts to know how many pending messages there are

    [ https://issues.apache.org/jira/browse/STORM-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14622345#comment-14622345 ] 

Shyam Rajendran edited comment on STORM-67 at 7/10/15 2:22 PM:
---------------------------------------------------------------

Waiting on +1s


was (Author: shyamrajendran):
The jira has not yet been through the review. 

> Provide API for spouts to know how many pending messages there are
> ------------------------------------------------------------------
>
>                 Key: STORM-67
>                 URL: https://issues.apache.org/jira/browse/STORM-67
>             Project: Apache Storm
>          Issue Type: New Feature
>            Reporter: James Xu
>            Assignee: Shyam Rajendran
>              Labels: newbie
>
> https://github.com/nathanmarz/storm/issues/343
> This would be useful in case you want to take special action in the spout like drop messages
> -----------------
> Discmt: Hi, I'd like to try and take a crack at this if it's still relevant. I'm not exactly sure what it's asking for though. It seems to me an implementation for knowing how many pending messages there are for a spout depends on where the spout is getting it's information from, which makes me sure I'm missing something.
> -----------------
> revans2: The spout code in backtype/storm/daemon/executor.clj is already keeping track of the pending tuples if acking is enabled. If acking is disabled nothing is pending.
> defmethod mk-threads :spout [executor-data task-datas]
> defines pending as a RotatingMap which maps all of the storm internal tuple ids to the message id objects passed in by the spout when it first emitted the tuple. The hardest part should be getting pending to a place where the ISpoutOutputCollector implementation or where ever the API is, can get access to it.
> -----------------
> ptgoetz: @Discmt Yes, this is still relevant and would be nice to have.
> The Storm framework asks spouts for tuples by calling the nextTuple() method and keeps track of the tuple tree internally. The underlying data source does not come into play.
> As implied by @revans2, one approach would be to add a method to ISpoutOutputCollector such as getPendingCount() that would allow spout implementations to query for the pending count (probably returning -1 if acking is disabled). The tricky part will likely be bridging the gap between executor.clj and the ISpoutOutputCollector implementation(s). I haven't dug very deeply into the code, so off-hand I don't know how hard that would be. A quick search of the code for TOPOLOGY_MAX_PENDING should point you to some of the touch points.
> Also keep in mind the dual meaning of TOPOLOGY_MAX_PENDING. In a standard storm topology it represents the maximum number of outstanding tuples. In a trident topology it represents the maximum number of outstanding batches.
> -----------------
> Discmt: Hey guys. I've been taking time to look into it, and I feel like I might have an understanding of what exactly it is I need to do. If what @revans2 said is true, and all pending messages are kept within that RotatingMap then this should be somewhat straightforward. I am trying to compile my own storm.jar file right now but I haven't figured how. I tried using build_release.sh in the bin file, but I had no luck. I also tried using lein jar
> -----------------
> xumingming: try the following:
> lein sub install
> lein install
> after these commands are executed, there should be a jar file named storm-xxx.jar in $STORM_HOME/target/.
> -----------------
> Discmt: @xumingming . Thanks for the advice. I found that I had Leiningen 1, but the minimum for is Leiningen 2.
> -----------------
> xumingming: yeah, storm requires lein 2 to build: https://github.com/nathanmarz/storm/blob/master/project.clj#L14
> -----------------
> Discmt: Hi guys. I got my development environment squared away and I can properly build releases now. I use the build_release.sh script. I tried making a change the way @ptgoetz and @revans2 had suggested by adding a method to the output collector to return the pending count. I have some questions about it.
> I noticed most of the collector implementations rely on a delegate, or mediator, which I'm assuming is defined here: https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L504-515. So if I make a add a method to get the size of pending, defined here https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L408-414, like so:
>                  (SpoutOutputCollector.
>                   (reify ISpoutOutputCollector
>                     (^int getPendingCount[this] 
>                       (.size pending)
>                       )
>                     (^List emit [this ^String stream-id ^List tuple ^Object message-id]
>                       (send-spout-msg stream-id tuple message-id nil)
>                       )
>                     (^void emitDirect [this ^int out-task-id ^String stream-id
>                                        ^List tuple ^Object message-id]
>                       (send-spout-msg stream-id tuple message-id out-task-id)
>                       )
>                     (reportError [this error]
>                       (report-error error)
>                       )))))
> I should be good right? Aside from the collectors of the trident spouts which may take more research.
> Just so I'm clear, messages are considered pending if they have left the spout and are waiting to be "fully processed", as defined here: https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing ?
> My last question is: does anyone have any suggestions on what would be a good way to test this? i.e. "What kind of topology/scenario should I run".
> -----------------
> revans2: The code you pasted above is creating a new SoputOputCollector instance wrapping an ISpoutOutputCollector instance. In order for any java spout to actually get access to the getPendingCount method, you will need to modify ISpoutOutputCollector to have that method defined in it, and also update SpoutOutputCollector, etc to also implement that method delegating to the ISpoutOutputCollector instance.
> You are correct about the definition of pending.
> Testing is a bit more difficult. You could have a spout that sends out messages to a bolt that does not process them. You could then verify that each time you send out a message the pending count goes up by 1. But to fully test it would would need to have some coordination between the spout and the bolt. This is not impossible but you may need to use global values to do so.
> -----------------
> Discmt: @revans2 I got the interface implementation covered. Thank you for the testing suggestion, and hints on how to perform it. It's incredibly helpful. I think that's a good idea, and I'm going to try and do that to test it out.
> -----------------
> nathanmarz: There's testing infrastructure already built that can do this kind of tracking. The name in the testing code is "tracked topologies". I'm traveling right now so can't give a link but you should be able to find it. It's used quite a bit in the tests of the acking system.
> -----------------
> Discmt: @nathanmarz This is also good news. I'll take a look and use what I can there to speed up the process for me.
> -----------------
> Discmt: Hi, I am asking if someone can explain to me how these tracked topologies work.
> Particularly I'm confused about these lines here:
> https://github.com/nathanmarz/storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L224-245
> Looking in integration_test.clj under the function test-acking on line 219 one of the spouts is told to feed a tuple(.feed feeder1 [1]). Then a tracked-wait is called on the topology being tracked to wait for one tuple to be emitted from the spout. Afterwards the checker checks whether or not 0 tuples have been acknowledged, and this what I expect is that one 1 tuple would have been acked because it was emitted from the spout. However, this is not the case. Then later on there checker1 is called again expecting there to be one tuple, but another one was not fed. Furthermore, feeder2, a second spout, was told to feed a tuple, and as expected one was already there when checker2 checked for one ack, but this behavior is not the same as the first checker.
> -----------------
> nathanmarz: tracked-wait waits until the entire tree of processing of the tuple has finished. The "checker1" function checks how many tuples on the spout have been acked since last time it was called. The topology is set up so that acknowledgement of tuple trees is delayed until bolts have received multiple tuples. That's why nothing is acked on the spout after the first tuple is emitted, but it is when the second tuple is emitted. If you look at the logic of branching-bolt and agg-bolt you'll see why this is the case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)