You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Rick Kellogg (JIRA)" <ji...@apache.org> on 2015/10/09 02:27:26 UTC

[jira] [Updated] (STORM-98) .stateQuery twice halts tuple execution?

     [ https://issues.apache.org/jira/browse/STORM-98?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rick Kellogg updated STORM-98:
------------------------------
    Component/s: storm-core

> .stateQuery twice halts tuple execution?
> ----------------------------------------
>
>                 Key: STORM-98
>                 URL: https://issues.apache.org/jira/browse/STORM-98
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/310
> Having the following example, it will never execute the .aggregate()
> FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
>         new Values("cow"),
>         new Values("candy"),
>         new Values("year"));
> spout.setCycle(true);
> TridentTopology topology = new TridentTopology();
> TridentState urlToTweeters =
>         topology.newStaticState(
>                 new TridentReach.StaticSingleKeyMapState.Factory(TridentReach.TWEETERS_DB));
> Stream wordStream = topology.newStream("spout1", spout)
>         .each(new Fields("sentence"), new Split(), new Fields("word"))
>         .stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new Fields("output1"))
>         .groupBy(new Fields("word"))
>         .stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new Fields("output2"))
>         .aggregate(new Fields("word"), new PrintAggregator(), new Fields("count"));
> PrintAggregator:
> public static class PrintAggregator extends BaseAggregator<PrintAggregator.State> {
>     static class State {
>         int counter = 0;
>     }
>     @Override
>     public State init(Object o, TridentCollector collector) {
>         return new State();
>     }
>     @Override
>     public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
>         state.counter++;
>         System.out.println(tuple.getString(0) + " is on: " + state.counter);
>     }
>     @Override
>     public void complete(State state, TridentCollector collector) {
>         collector.emit(new Values(state.counter));
>     }
> }
> ----------
> nathanmarz: Trident currently doesn't support recursive topologies. In this case you have the output of a state feeding back into a query on the same state. You can workaround this by making two separate static state instances for urlToTweeters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)