You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by roshannaik <gi...@git.apache.org> on 2017/02/02 09:42:43 UTC

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

GitHub user roshannaik opened a pull request:

    https://github.com/apache/storm/pull/1914

    STORM-2334 - Join Bolt implementation with unit tests

    For details see : 
    https://issues.apache.org/jira/browse/STORM-2334

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/roshannaik/storm STORM-2334

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1914
    
----
commit f193e266a5f2701c2a91fa8e73778d6e4c2fa3d7
Author: Roshan Naik <ro...@hortonworks.com>
Date:   2017-02-01T01:21:46Z

    STORM-2334 Join Bolt implementation with unit tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @arunmahadevan can you please commit this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100952160
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    +to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
    +The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. 
    +Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
    +calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
    +requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
    +to join.
    +
    +The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
    +Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
    +multiple streams as follows:  `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
    +nesting has been done using `Map`s. For example  `outer.inner.innermost` refers to a field that is nested three levels
    +deep where `outer` and `inner` are of type `Map`.   
    +
    +Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. 
    +
    +The call to `withTumblingWindow()` above, configures the join window to be a 10 minute tumbling window. Since `JoinBolt` 
    +is a Windowed Bolt, we can also use the `withWindow` method to configure it as a sliding window (see tips section below). 
    +
    +## Stream names and Join order
    +1. Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred
    +to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:
    +
    +```java
    +new JoinBolt( "spout1", "key1")                 
    +  .join     ( "spout2", "userId",  "spout3") //not allowed. spout3 not yet introduced
    +  .join     ( "spout3", "key3",    "spout1")
    +```
    +
    +2. Internally, the joins will be performed in the order expressed by the user.
    --- End diff --
    
    Its showing up as 1. in the rich diff mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @roshannaik Can you add some documentation for this similar to what we have for Windowing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99724571
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    Yes good question. As of now, grouping on single key and consequently joining on 1 key only is supported ...  noted in the doc, 
    
    To get the effect of joining on multiple fields you need to combine two/more fields into one, somewhere upstream. Then you can join on that merged field. Perhaps worth mentioning this in docs.
    
    To be more inline with SQL abilities, I started supporting streams grouped on multiple fields for joins initially, but later scaled it back to single key. Saw a few issues with opening up grouping and joins on multiple fields. Let me try to summarize...
    
    It easily gives users the wrong impression that they can do many SQL-type things .. like
    
    Assuming S1 is grouped on both f1 & f2:
    ```
    from s1
    join s2   s2.f1 == s1.f1 && s2.f2 == s1.f2     // uses both f1 & f2 form s1
    join s3   s3.f1 == s1.f1 && s3.f2 == s2.f2     // one field from s1 and another from s2
    ```
    which cannot work correctly due to FG.
    
    Another example is  grouping S1 on f1 and f2 and then trying to do this...
    ```
    from s1
    join s2   s2.f1 == s1.f1    // use s1.f1 
    join s3   s3.f1 == s1.f2   // but s1.f2 here
    ```
    
    **In short:** The extent to which multi field joins can be correctly supported is limited to whatever can be achieved by merging those fields into one before joining. Going beyond it complicated the interface and the internal checking... in addition to easily setting up wrong user expectations.
    
    The downside of current approach is an additional step to merge the fields in such cases. Thats the trade off I settled for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99227816
  
    --- Diff: storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---
    @@ -0,0 +1,428 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.bolt;
    +
    +
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.TupleWindow;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +public class JoinBolt extends BaseWindowedBolt {
    +
    +    private OutputCollector collector;
    +
    +    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    +    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
    +
    +    // Map[StreamName -> JoinInfo]
    +    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    +    protected String[][] outputKeys;  // specified via bolt.select() ... used in declaring Output fields
    +    protected String[] dotSeparatedOutputKeyNames; // flattened (de nested) keyNames, used for naming output fields
    +    protected String outputStreamName;
    +
    +    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
    +    public enum Selector { STREAM, SOURCE }
    +    protected final Selector selectorType;
    +
    +
    +    /**
    +     * StreamId to start the join with. Equivalent SQL ...
    +     *       select .... from streamId ...
    +     * @param type Specifies whether 'streamId' refers to stream name/source component
    +     * @param streamId name of stream/source component
    +     * @param key the fieldName to use as key for the stream (used for performing joins)
    +     */
    +    public JoinBolt(Selector type, String streamId, String key) {
    +        selectorType = type;
    +        joinCriteria.put(streamId, new JoinInfo(key) );
    +    }
    +
    +    /**
    +     * Defines the name of the output stream
    +     */
    +    public JoinBolt withOutputStream(String streamName) {
    +        this.outputStreamName = streamName;
    +        return this;
    +    }
    +
    +    /**
    +     * Performs inner Join.
    +     *  SQL    :   from priorStream inner join newStream on newStream.key = priorStream.key1
    +     *  same as:   new WindowedQueryBolt(priorStream,key1). join(newStream, key, priorStream);
    +     *
    +     *  Note: priorStream must be previously joined.
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
    --- End diff --
    
    It is necessary to specify which stream s3 should be joined with (s2 or s1?) .... as the results will differ accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99575982
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    How to Join on a combination of fields (e.g. `s1.f1 == s2.f1 && s1.f2 == s2.f2`) ? Similarly on partial values of fields.
    
    The above could be done if users are writing custom bolts for join, see "Join" section - https://github.com/apache/storm/blob/master/docs/Common-patterns.md, so a good idea to support it.
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @arunmahadevan  yes..  good idea... i will add an example into storm-starter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100372018
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99951845
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    Yes, not supported. Joining on a subset of grouping fields is not supported as it **will** lead to wrong results in a streaming system... unless you restrict it to 1 running instance of the Join bolt.  
    
    This is not a limitation of the JoinBolt per say, it is a limitation that it inherits from the nature of in which streaming data is routed.  Its a bit tricky to express, but let me give it a try:
    
    
    When we join with f1, the system **must** route the tuples based on hash(f1) to the right Join bolt instance.  If we want to join on both f1&f2, then routing **must** be based on hash(f1,f2).  
    Basically, if the join fields and the grouping/routing fields are not same, the bolt instance will not receive all the right tuples for the join.
    
    So there is an inherent contradiction when you try to route on hash(f1,f2),  then try to join on anything but those two fields. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99945247
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    --- End diff --
    
    yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by raghavgautam <gi...@git.apache.org>.
Github user raghavgautam commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99237976
  
    --- Diff: storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---
    @@ -0,0 +1,428 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.bolt;
    +
    +
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.TupleWindow;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +public class JoinBolt extends BaseWindowedBolt {
    +
    +    private OutputCollector collector;
    +
    +    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    +    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
    +
    +    // Map[StreamName -> JoinInfo]
    +    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    +    protected String[][] outputKeys;  // specified via bolt.select() ... used in declaring Output fields
    +    protected String[] dotSeparatedOutputKeyNames; // flattened (de nested) keyNames, used for naming output fields
    +    protected String outputStreamName;
    +
    +    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
    +    public enum Selector { STREAM, SOURCE }
    +    protected final Selector selectorType;
    +
    +
    +    /**
    +     * StreamId to start the join with. Equivalent SQL ...
    +     *       select .... from streamId ...
    +     * @param type Specifies whether 'streamId' refers to stream name/source component
    +     * @param streamId name of stream/source component
    +     * @param key the fieldName to use as key for the stream (used for performing joins)
    +     */
    +    public JoinBolt(Selector type, String streamId, String key) {
    +        selectorType = type;
    +        joinCriteria.put(streamId, new JoinInfo(key) );
    +    }
    +
    +    /**
    +     * Defines the name of the output stream
    +     */
    +    public JoinBolt withOutputStream(String streamName) {
    +        this.outputStreamName = streamName;
    +        return this;
    +    }
    +
    +    /**
    +     * Performs inner Join.
    +     *  SQL    :   from priorStream inner join newStream on newStream.key = priorStream.key1
    +     *  same as:   new WindowedQueryBolt(priorStream,key1). join(newStream, key, priorStream);
    +     *
    +     *  Note: priorStream must be previously joined.
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
    --- End diff --
    
    Discussed this offline with @roshannaik , this is necessary when we have left outer join. Consider a join of tables T1, T2, T3. If join of T1 and T2 is left outer join it can lead to intermediate table which looks like:
    
    T1.K1| T2.K1
    -----|-------
     Y     | NULL 
    Now consider the case, when we have to do inner join with T3.K1 (with value Y) the resulting table might change depending on whether it is join on T1.K1 or T2.K1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99571544
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    --- End diff --
    
    How is it enforced ? 
    RuntimeException at topology construction time ?
    Best if the field groupings on the join keys are automatically handled by the api than relying on user to define it correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99744125
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    the sql like syntax is not supported correct? So assume joining on subset of join fields (the "not ok" case in your example) is not possible with the current api. 
    Its only about allowing users to have multiple fields as the join key without the extra merge step. May be accepting a type like `JoinKey` than string as the key parameter in the api would make it clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @arunmahadevan ... PR has been revised. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99169659
  
    --- Diff: storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---
    @@ -0,0 +1,428 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.bolt;
    +
    +
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.TupleWindow;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +public class JoinBolt extends BaseWindowedBolt {
    +
    +    private OutputCollector collector;
    +
    +    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    +    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
    +
    +    // Map[StreamName -> JoinInfo]
    +    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    +    protected String[][] outputKeys;  // specified via bolt.select() ... used in declaring Output fields
    +    protected String[] dotSeparatedOutputKeyNames; // flattened (de nested) keyNames, used for naming output fields
    +    protected String outputStreamName;
    +
    +    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
    +    public enum Selector { STREAM, SOURCE }
    +    protected final Selector selectorType;
    +
    +
    +    /**
    +     * StreamId to start the join with. Equivalent SQL ...
    +     *       select .... from streamId ...
    +     * @param type Specifies whether 'streamId' refers to stream name/source component
    +     * @param streamId name of stream/source component
    +     * @param key the fieldName to use as key for the stream (used for performing joins)
    +     */
    +    public JoinBolt(Selector type, String streamId, String key) {
    +        selectorType = type;
    +        joinCriteria.put(streamId, new JoinInfo(key) );
    +    }
    +
    +    /**
    +     * Defines the name of the output stream
    +     */
    +    public JoinBolt withOutputStream(String streamName) {
    +        this.outputStreamName = streamName;
    +        return this;
    +    }
    +
    +    /**
    +     * Performs inner Join.
    +     *  SQL    :   from priorStream inner join newStream on newStream.key = priorStream.key1
    +     *  same as:   new WindowedQueryBolt(priorStream,key1). join(newStream, key, priorStream);
    +     *
    +     *  Note: priorStream must be previously joined.
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
    +     *    Invalid ex:  new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
    +     */
    +    public JoinBolt join(String newStream, String key, String priorStream) {
    +        return join_common(newStream, key, priorStream, JoinType.INNER);
    +    }
    +
    +    /**
    +     * Performs left Join.
    +     *  SQL    :   from stream1  left join stream2  on stream2.key = stream1.key1
    +     *  same as:   new  WindowedQueryBolt(stream1, key1). leftJoin(stream2, key, stream1);
    +     *
    +     *  Note: priorStream must be previously joined
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
    +     *    Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
    +     */
    +    public JoinBolt leftJoin(String newStream, String key, String priorStream) {
    +        return join_common(newStream, key, priorStream, JoinType.LEFT);
    +    }
    +
    +    private JoinBolt join_common(String newStream, String key, String priorStream, JoinType joinType) {
    --- End diff --
    
    can we keep the method names to camelcase like joinCommon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @ptgoetz yes ... I was thinking about that myself... thinking of creating a Join.md under the top level docs directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100959018
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    +to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
    +The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. 
    +Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
    +calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
    +requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
    +to join.
    +
    +The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
    +Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
    +multiple streams as follows:  `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
    --- End diff --
    
    ya... i started with the dot syntax... then realized that it still leaves ambiguity .. if there is a field and stream with same name. I wanted to avoid putting code to guess whether the first part is a stream name or a key name. So its just to avoid that corner case. 
    
    Standard SQL does not have this issue as it doesn't allow a mix of table names and nested field names in  a simple x.y.z format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99983167
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    --- End diff --
    
    @roshannaik may be the earlier comment was not clear. Since the join key is also the group by field in the api, choosing a subset of those keys in the join condition is not possible.
    
    E.g.
    ```java
    new JoinBolt(JoinBolt.Selector.STREAM, "stream1", new Fields("key1", "key2")) 
             // here its not possible to join on stream1.key1 or stream1.key2 separately
            .join("stream2", new Fields("key3", "key4"), "stream1");
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    @arunmahadevan  i have update the PR with
    - Updated docs
    - Added sample topo
    - Added support to qualify keys with stream names for disambiguation
    - Added more UT
    
    Have addressed all the outstanding review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99744089
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    --- End diff --
    
    based on the next comment, checks are not  done and applying the right grouping is up to the user. 
    IMO, adding as many validations as possible and automatically handling the groupings would make it less error prone. But this may require changes in the topology builder, so for now may be just add some relevant code that shows the fields grouping (`builder.setBolt(joinBolt).fieldsGrouping(..)`) in the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100959498
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    +to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
    +The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. 
    +Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
    +calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
    +requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
    +to join.
    +
    +The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
    +Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
    +multiple streams as follows:  `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
    +nesting has been done using `Map`s. For example  `outer.inner.innermost` refers to a field that is nested three levels
    +deep where `outer` and `inner` are of type `Map`.   
    +
    +Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. 
    +
    +The call to `withTumblingWindow()` above, configures the join window to be a 10 minute tumbling window. Since `JoinBolt` 
    +is a Windowed Bolt, we can also use the `withWindow` method to configure it as a sliding window (see tips section below). 
    +
    +## Stream names and Join order
    +1. Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred
    +to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:
    +
    +```java
    +new JoinBolt( "spout1", "key1")                 
    +  .join     ( "spout2", "userId",  "spout3") //not allowed. spout3 not yet introduced
    +  .join     ( "spout3", "key3",    "spout1")
    +```
    +
    +2. Internally, the joins will be performed in the order expressed by the user.
    --- End diff --
    
    fixed. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100959503
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    --- End diff --
    
    fixed. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99571904
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    +                            .join     ("stream3", "key3",    "stream2") // join      stream3  on stream3.key3   = stream2.userId   
    +                            .leftjoin ("stream4", "key4",    "stream3") // left join stream4  on stream4.key4   = stream3.key3
    +                            .select("userId, key4, key2, key3")         // chose output fields
    --- End diff --
    
    1. why is it not a `List<String>`
    2. How does one select keys having the same name from different streams (e.g. stream2.key1, stream3.key1) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100952035
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    +to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
    +The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. 
    +Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
    +calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
    +requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
    +to join.
    +
    +The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
    +Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
    +multiple streams as follows:  `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
    --- End diff --
    
    why not **dot** syntax which is closer to the sql syntax, for example `select("spout3.key3, spout4.key4.nested1, key5")` ? Is the syntax same if stream name is used instead of component name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99945134
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    --- End diff --
    
    sure... good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99711705
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    --- End diff --
    
    Absence of the definition of a specific stream upstream is equivalent to not receiving any data from that stream from the Bolt. Conversely, if a upstream component sends it data on a stream that the bolt is not aware of, it will ignore that stream. Bolt doesn't examine the entire topology definition scanning for incorrect wiring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99744103
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    --- End diff --
    
    may be for now just add the relevant code in the docs to show the upstream component declaring the stream to make it clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99227905
  
    --- Diff: storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---
    @@ -0,0 +1,428 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.bolt;
    +
    +
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.TupleWindow;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +public class JoinBolt extends BaseWindowedBolt {
    +
    +    private OutputCollector collector;
    +
    +    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    +    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
    +
    +    // Map[StreamName -> JoinInfo]
    +    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    +    protected String[][] outputKeys;  // specified via bolt.select() ... used in declaring Output fields
    +    protected String[] dotSeparatedOutputKeyNames; // flattened (de nested) keyNames, used for naming output fields
    +    protected String outputStreamName;
    +
    +    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
    +    public enum Selector { STREAM, SOURCE }
    +    protected final Selector selectorType;
    +
    +
    +    /**
    +     * StreamId to start the join with. Equivalent SQL ...
    +     *       select .... from streamId ...
    +     * @param type Specifies whether 'streamId' refers to stream name/source component
    +     * @param streamId name of stream/source component
    +     * @param key the fieldName to use as key for the stream (used for performing joins)
    +     */
    +    public JoinBolt(Selector type, String streamId, String key) {
    +        selectorType = type;
    +        joinCriteria.put(streamId, new JoinInfo(key) );
    +    }
    +
    +    /**
    +     * Defines the name of the output stream
    +     */
    +    public JoinBolt withOutputStream(String streamName) {
    +        this.outputStreamName = streamName;
    +        return this;
    +    }
    +
    +    /**
    +     * Performs inner Join.
    +     *  SQL    :   from priorStream inner join newStream on newStream.key = priorStream.key1
    +     *  same as:   new WindowedQueryBolt(priorStream,key1). join(newStream, key, priorStream);
    +     *
    +     *  Note: priorStream must be previously joined.
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
    +     *    Invalid ex:  new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
    +     */
    +    public JoinBolt join(String newStream, String key, String priorStream) {
    +        return join_common(newStream, key, priorStream, JoinType.INNER);
    +    }
    +
    +    /**
    +     * Performs left Join.
    +     *  SQL    :   from stream1  left join stream2  on stream2.key = stream1.key1
    +     *  same as:   new  WindowedQueryBolt(stream1, key1). leftJoin(stream2, key, stream1);
    +     *
    +     *  Note: priorStream must be previously joined
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
    +     *    Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
    +     */
    +    public JoinBolt leftJoin(String newStream, String key, String priorStream) {
    +        return join_common(newStream, key, priorStream, JoinType.LEFT);
    +    }
    +
    +    private JoinBolt join_common(String newStream, String key, String priorStream, JoinType joinType) {
    --- End diff --
    
    indeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99712278
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    --- End diff --
    
    answered in next comment...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1914


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by raghavgautam <gi...@git.apache.org>.
Github user raghavgautam commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99135627
  
    --- Diff: storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java ---
    @@ -0,0 +1,428 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.bolt;
    +
    +
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.TupleWindow;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +public class JoinBolt extends BaseWindowedBolt {
    +
    +    private OutputCollector collector;
    +
    +    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    +    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
    +
    +    // Map[StreamName -> JoinInfo]
    +    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    +    protected String[][] outputKeys;  // specified via bolt.select() ... used in declaring Output fields
    +    protected String[] dotSeparatedOutputKeyNames; // flattened (de nested) keyNames, used for naming output fields
    +    protected String outputStreamName;
    +
    +    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
    +    public enum Selector { STREAM, SOURCE }
    +    protected final Selector selectorType;
    +
    +
    +    /**
    +     * StreamId to start the join with. Equivalent SQL ...
    +     *       select .... from streamId ...
    +     * @param type Specifies whether 'streamId' refers to stream name/source component
    +     * @param streamId name of stream/source component
    +     * @param key the fieldName to use as key for the stream (used for performing joins)
    +     */
    +    public JoinBolt(Selector type, String streamId, String key) {
    +        selectorType = type;
    +        joinCriteria.put(streamId, new JoinInfo(key) );
    +    }
    +
    +    /**
    +     * Defines the name of the output stream
    +     */
    +    public JoinBolt withOutputStream(String streamName) {
    +        this.outputStreamName = streamName;
    +        return this;
    +    }
    +
    +    /**
    +     * Performs inner Join.
    +     *  SQL    :   from priorStream inner join newStream on newStream.key = priorStream.key1
    +     *  same as:   new WindowedQueryBolt(priorStream,key1). join(newStream, key, priorStream);
    +     *
    +     *  Note: priorStream must be previously joined.
    +     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
    --- End diff --
    
    It would be probably better if the user could specify joins by saying:
    new WindowedQueryBolt(s1,k1). join(s2,k2). join(s3,k3);
    (note missing s1, s2 parameters in the join method)
    This will be less error prone and more in line with SQL expectation where user specifies:
    table1.key1 = table2.key2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    +1, will wait for a day to see if there are any comments from other reviewers and will merge this after that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    Revised the PR with the fixes. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99572676
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    --- End diff --
    
    How do you ensure "stream1" exists ? i.e. some spout or bolt should have declared "stream1" and JoinBolt should subscribe to that spout/bolt's output via fields grouping on the join key. Assume it would throw RuntimeException at topology construction time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r100951653
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,126 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within a Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from        table1
    +inner join  table2  on table2.userId =  table1.key1
    +inner join  table3  on table3.key3   =  table2.userId
    +left join   table4  on table4.key4   =  table3.key3
    +```
    +
    +Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
    +
    +```java
    +JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
    +                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
    +                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
    +                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
    +                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
    +                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
    +
    +topoBuilder.setBolt("joiner", jbolt, 1)
    +            .fieldsGrouping("spout1", new Fields("key1") )
    +            .fieldsGrouping("spout2", new Fields("userId") )
    +            .fieldsGrouping("spout3", new Fields("key3") )
    +            .fieldsGrouping("spout4", new Fields("key4") );
    +```
    +
    +The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`'
    --- End diff --
    
    extra char (') after `spout1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1914: STORM-2334 - Join Bolt implementation with unit tests

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1914
  
    Thanks @roshannaik merged to master and 1.x-branch. You can close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1914: STORM-2334 - Join Bolt implementation with unit te...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1914#discussion_r99713201
  
    --- Diff: docs/Joins.md ---
    @@ -0,0 +1,102 @@
    +---
    +title: Joining Streams in Storm Core
    +layout: documentation
    +documentation: true
    +---
    +
    +Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
    +`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
    +tuples among the streams being joined. This helps align the streams within the Window boundary.
    +
    +Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
    +should only be joined with the other streams using the field on which it has been FieldsGrouped.  
    +Knowing this will help understand the join syntax described below.  
    +
    +## Performing Joins
    +Consider the following SQL join involving 4 tables called: stream1, stream2, stream3 & stream4:
    +
    +```sql
    +select  userId, key4, key2, key3
    +from    stream1 
    +join       stream2  on stream2.userId =  stream1.key1
    +join       stream3  on stream3.key3   =  stream2.userId
    +left join  stream4  on stream4.key4   =  stream3.key3
    +```
    +
    +This could be expressed using `JoinBolt` over 4 similarly named streams as: 
    +
    +```java
    +new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")               // from stream1  
    +                            .join     ("stream2", "userId",  "stream1") // join      stream2  on stream2.userId = stream1.key1
    +                            .join     ("stream3", "key3",    "stream2") // join      stream3  on stream3.key3   = stream2.userId   
    +                            .leftjoin ("stream4", "key4",    "stream3") // left join stream4  on stream4.key4   = stream3.key3
    +                            .select("userId, key4, key2, key3")         // chose output fields
    --- End diff --
    
    1. Conversion it to list is done internally.  Chose CSV string instead of list to make it easier for end users (less typing) ... without loss of functionality. 
    
    2. I think .. as of now its not supported.. let me take a look and add it if not there already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---