You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Shubham Pathak <sh...@datatorrent.com> on 2016/02/21 18:49:48 UTC

Running Storm topology on Apex

Hello,

As a part of integration with other open source streaming platforms,
ability to run Storm topology would be a good addition to Apex.

Following JIRA is open to track the idea
https://issues.apache.org/jira/browse/APEXMALHAR-1996

We could make progress in phases.

Phase 1 :
Embedding Storm Spout/Bolt as operators in Apex.

User would implement the populateDag method and embed Storm Spout/Bolt into
the operator.
Here's the example :
For a word count topology in Storm
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new WordSpout(), 5);
    builder.setBolt("split", new BoltTokenizer(),
8).shuffleGrouping("spout");
    builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split",
new Fields("word"));

a corresponding Apex application would look like this :

    SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new
WordSpout(), "spout"));
    BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new
BoltTokenizer(), "split"));
    BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new
BoltCounter(), "count"));
    dag.addStream("input", input.output, tokens.input);
    dag.addStream("word-count", tokens.output, counter.input);

For stream groupings, we could provide concrete implementation of
StreamCodecs

In the above case to specify "fieldsGrouping("split", new Fields("word"));"
we could have
dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
StormTupleStreamCodec(new int[] { 0 }));

Phase 2:  Ability to execute whole Storm topology in Apex.
Here, user could submit the storm topoplogy AS-IS and we will provide a
utility/ higher level API to generate Apex Dag from Storm topology.

To validate the first phase, with a quick implementation of SpoutWrapper ,
BoltWrapper  and StormTupleStreamCodec we were able to run word count
application with partitioning.
You may have a look at the code here :
https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm


Word Count application involved single streams between operators and hence
was fairly easy to implement.
Storm Spout/ bolt can emit more than one stream. To do so user declares
multiple streams using the declareStream method of OutputFieldsDeclarer and
specifies the stream to emit to when using the emit method on
OutputCollector.
Example : https://gist.github.com/Xorlev/8058947

To embed such bolts/spouts within an operator , we would require some
support  to be added in Apex.
One such feature could be ability to declare input/output ports
dynamically. There was a mail thread for supporting collection based ports.
This will help in our case too.

Please let me know your thoughts.

Thanks,
Shubham

Re: Running Storm topology on Apex

Posted by Shubham Pathak <sh...@datatorrent.com>.
Hello,

Would like to have inputs on this proposal from the community, particularly
from folks who have used Storm before.
For phase 1, apart from challenge of embedding bolt/spout with multiple
streams, do we see any other technical challenge while implementing the
feature ?


Thanks,
Shubham

On Sun, Feb 21, 2016 at 11:19 PM, Shubham Pathak <sh...@datatorrent.com>
wrote:

> Hello,
>
> As a part of integration with other open source streaming platforms,
> ability to run Storm topology would be a good addition to Apex.
>
> Following JIRA is open to track the idea
> https://issues.apache.org/jira/browse/APEXMALHAR-1996
>
> We could make progress in phases.
>
> Phase 1 :
> Embedding Storm Spout/Bolt as operators in Apex.
>
> User would implement the populateDag method and embed Storm Spout/Bolt
> into the operator.
> Here's the example :
> For a word count topology in Storm
>     TopologyBuilder builder = new TopologyBuilder();
>     builder.setSpout("spout", new WordSpout(), 5);
>     builder.setBolt("split", new BoltTokenizer(),
> 8).shuffleGrouping("spout");
>     builder.setBolt("count", new BoltCounter(), 8).fieldsGrouping("split",
> new Fields("word"));
>
> a corresponding Apex application would look like this :
>
>     SpoutWrapper input = dag.addOperator("input", new SpoutWrapper(new
> WordSpout(), "spout"));
>     BoltWrapper tokens = dag.addOperator("tokenizer", new BoltWrapper(new
> BoltTokenizer(), "split"));
>     BoltWrapper counter = dag.addOperator("counter", new BoltWrapper(new
> BoltCounter(), "count"));
>     dag.addStream("input", input.output, tokens.input);
>     dag.addStream("word-count", tokens.output, counter.input);
>
> For stream groupings, we could provide concrete implementation of
> StreamCodecs
>
> In the above case to specify "fieldsGrouping("split", new
> Fields("word"));" we could have
> dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> StormTupleStreamCodec(new int[] { 0 }));
>
> Phase 2:  Ability to execute whole Storm topology in Apex.
> Here, user could submit the storm topoplogy AS-IS and we will provide a
> utility/ higher level API to generate Apex Dag from Storm topology.
>
> To validate the first phase, with a quick implementation of SpoutWrapper ,
> BoltWrapper  and StormTupleStreamCodec we were able to run word count
> application with partitioning.
> You may have a look at the code here :
>
> https://github.com/shubham-pathak22/incubator-apex-malhar/tree/storm-apex/demos/storm-demo/src/main/java/com/datatorrent/demos/storm
>
>
> Word Count application involved single streams between operators and hence
> was fairly easy to implement.
> Storm Spout/ bolt can emit more than one stream. To do so user declares
> multiple streams using the declareStream method of OutputFieldsDeclarer and
> specifies the stream to emit to when using the emit method on
> OutputCollector.
> Example : https://gist.github.com/Xorlev/8058947
>
> To embed such bolts/spouts within an operator , we would require some
> support  to be added in Apex.
> One such feature could be ability to declare input/output ports
> dynamically. There was a mail thread for supporting collection based
> ports. This will help in our case too.
>
> Please let me know your thoughts.
>
> Thanks,
> Shubham
>