You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "P. Taylor Goetz (JIRA)" <ji...@apache.org> on 2017/02/24 18:05:44 UTC

[jira] [Reopened] (STORM-2334) Bolt for Joining streams

     [ https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

P. Taylor Goetz reopened STORM-2334:
------------------------------------

> Bolt for Joining streams
> ------------------------
>
>                 Key: STORM-2334
>                 URL: https://issues.apache.org/jira/browse/STORM-2334
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.x
>            Reporter: Roshan Naik
>            Assignee: Roshan Naik
>             Fix For: 2.0.0, 1.1.0
>
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Create a general purpose windowed bolt that performs Joins on multiple data streams.
> Since, depending on the topo config,  the bolt could be receiving data either on 'default' streams or on named streams .... join bolt should be able to differentiate the incoming data based on names of upstream components as well as stream names.
> *Example:*
> The following SQL style join involving 4 tables :
> {code}
> select  userId, key4, key2, key3
> from stream1 
> join       stream2  on stream2.userId =  stream1.key1
> join       stream3  on stream3.key3   =  stream2.userId
> left join  stream4  on stream4.key4   =  stream3.key3
> {code}
> Could be expressed using the Join Bolt over 4 named streams as :
> {code}
> new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that stream1/2/3/4 are names of streams. 'key1' is the key on which 
>      .join     ("stream2", "userId",  "stream1") //join stream2 on stream2.userId=stream1.key1
>      .join     ("stream3", "key3",    "stream2") //join stream3 on stream3.key3=stream2.userId   
>      .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on stream4.key4=stream3.key3
>      .select("userId, key4, key2, key3")         // chose output fields
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> Or based on named source components :
> {code}
> new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that kafkaSpout1, hdfsSpout3 etc are names of upstream components 
>      .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
>      .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
>      .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
>      .select ("userId, key4, key2, key3")
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> In order for the tuples to  be joined correctly, 'fields grouping' should be employed on the incoming streams. Each stream should be grouped on the same key using which it will be joined against other streams.  This is a restriction compared to SQL which allows join a table with others on any key and any number of keys.
> *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1' can be joined using the same key with multiple other streams as show in this SQL.
> {code}
> select ....
> from stream1 
> join  stream2  on stream2.userId =  stream1.key1
> join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join Bolt 
> {code}
> Consequently the join bolt's syntax is a bit simplified compared to SQL. The key name for any given stream only appears once, as soon the stream is introduced for the first time in the join. Thereafter that key is implicitly used for joining. See the case of 'stream3' being joined with both 'stream2' and 'stream4' in the first example.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)