You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2017/02/24 08:01:45 UTC

[1/2] storm git commit: STORM-2334 Join Bolt implementation with unit tests

Repository: storm
Updated Branches:
  refs/heads/1.x-branch d36205289 -> e01f22827


STORM-2334 Join Bolt implementation with unit tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f4ded162
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f4ded162
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f4ded162

Branch: refs/heads/1.x-branch
Commit: f4ded1622b4ee3e9c9f6960ffa6ffb8305916d06
Parents: d362052
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Tue Jan 31 17:21:46 2017 -0800
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Fri Feb 24 12:52:08 2017 +0530

----------------------------------------------------------------------
 docs/Common-patterns.md                         |  28 +-
 docs/Joins.md                                   | 125 ++++
 docs/Windowing.md                               |   2 +-
 docs/index.md                                   |   1 +
 .../apache/storm/starter/JoinBoltExample.java   |  87 +++
 .../apache/storm/starter/SingleJoinExample.java |   4 +
 .../storm/starter/bolt/SingleJoinBolt.java      |   4 +
 .../src/jvm/org/apache/storm/bolt/JoinBolt.java | 593 +++++++++++++++++++
 .../jvm/org/apache/storm/bolt/TestJoinBolt.java | 356 +++++++++++
 9 files changed, 1177 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/docs/Common-patterns.md
----------------------------------------------------------------------
diff --git a/docs/Common-patterns.md b/docs/Common-patterns.md
index b98fe90..e0e8c1f 100644
--- a/docs/Common-patterns.md
+++ b/docs/Common-patterns.md
@@ -6,28 +6,12 @@ documentation: true
 
 This page lists a variety of common patterns in Storm topologies.
 
-1. Streaming joins
-2. Batching
-3. BasicBolt
-4. In-memory caching + fields grouping combo
-5. Streaming top N
-6. TimeCacheMap for efficiently keeping a cache of things that have been recently updated
-7. CoordinatedBolt and KeyedFairBolt for Distributed RPC
-
-### Joins
-
-A streaming join combines two or more data streams together based on some common field. Whereas a normal database join has finite input and clear semantics for a join, a streaming join has infinite input and unclear semantics for what a join should be.
-
-The join type you need will vary per application. Some applications join all tuples for two streams over a finite window of time, whereas other applications expect exactly one tuple for each side of the join for each join field. Other applications may do the join completely differently. The common pattern among all these join types is partitioning multiple input streams in the same way. This is easily accomplished in Storm by using a fields grouping on the same fields for many input streams to the joiner bolt. For example:
-
-```java
-builder.setBolt("join", new MyJoiner(), parallelism)
-  .fieldsGrouping("1", new Fields("joinfield1", "joinfield2"))
-  .fieldsGrouping("2", new Fields("joinfield1", "joinfield2"))
-  .fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));
-```
-
-The different streams don't have to have the same field names, of course.
+1. Batching
+2. BasicBolt
+3. In-memory caching + fields grouping combo
+4. Streaming top N
+5. TimeCacheMap for efficiently keeping a cache of things that have been recently updated
+6. CoordinatedBolt and KeyedFairBolt for Distributed RPC
 
 
 ### Batching

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/docs/Joins.md
----------------------------------------------------------------------
diff --git a/docs/Joins.md b/docs/Joins.md
new file mode 100644
index 0000000..d2b072d
--- /dev/null
+++ b/docs/Joins.md
@@ -0,0 +1,125 @@
+---
+title: Joining Streams in Storm Core
+layout: documentation
+documentation: true
+---
+
+Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
+`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
+tuples among the streams being joined. This helps align the streams within a Window boundary.
+
+Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream 
+should only be joined with the other streams using the field on which it has been FieldsGrouped.  
+Knowing this will help understand the join syntax described below.  
+
+## Performing Joins
+Consider the following SQL join involving 4 tables:
+
+```sql
+select  userId, key4, key2, key3
+from        table1
+inner join  table2  on table2.userId =  table1.key1
+inner join  table3  on table3.key3   =  table2.userId
+left join   table4  on table4.key4   =  table3.key3
+```
+
+Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
+
+```java
+JoinBolt jbolt =  new JoinBolt("spout1", "key1")                   // from        spout1  
+                    .join     ("spout2", "userId",  "spout1")      // inner join  spout2  on spout2.userId = spout1.key1
+                    .join     ("spout3", "key3",    "spout2")      // inner join  spout3  on spout3.key3   = spout2.userId   
+                    .leftJoin ("spout4", "key4",    "spout3")      // left join   spout4  on spout4.key4   = spout3.key3
+                    .select  ("userId, key4, key2, spout3:key3")   // chose output fields
+                    .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
+
+topoBuilder.setBolt("joiner", jbolt, 1)
+            .fieldsGrouping("spout1", new Fields("key1") )
+            .fieldsGrouping("spout2", new Fields("userId") )
+            .fieldsGrouping("spout3", new Fields("key3") )
+            .fieldsGrouping("spout4", new Fields("key4") );
+```
+
+The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`
+to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
+The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. 
+Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
+calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
+requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
+to join.
+
+The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
+Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
+multiple streams as follows:  `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
+nesting has been done using `Map`s. For example  `outer.inner.innermost` refers to a field that is nested three levels
+deep where `outer` and `inner` are of type `Map`.   
+
+Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. 
+
+The call to `withTumblingWindow()` above, configures the join window to be a 10 minute tumbling window. Since `JoinBolt` 
+is a Windowed Bolt, we can also use the `withWindow` method to configure it as a sliding window (see tips section below). 
+
+## Stream names and Join order
+* Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred
+to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:
+
+```java
+new JoinBolt( "spout1", "key1")                 
+  .join     ( "spout2", "userId",  "spout3") //not allowed. spout3 not yet introduced
+  .join     ( "spout3", "key3",    "spout1")
+```
+* Internally, the joins will be performed in the order expressed by the user.
+
+## Joining based on Stream names
+
+For simplicity, Storm topologies often use the `default` stream. Topologies can also use named streams 
+instead of `default` streams.  To support such topologies, `JoinBolt` can be configured to use stream
+names,  instead of source component (spout/bolt) names, via the constructor's first argument:
+
+```java
+new JoinBolt(JoinBolt.Selector.STREAM,  "stream1", "key1")
+                                  .join("stream2", "key2")
+    ...
+```
+The first argument `JoinBolt.Selector.STREAM` informs the bolt that `stream1/2/3/4` refer to named streams
+(as opposed to names of upstream spouts/bolts).
+
+
+The below example joins two named streams from four spouts:
+
+```java
+new JoinBolt(JoinBolt.Selector.STREAM,  "stream1", "key1") 
+                             .join     ("stream2", "userId",  "stream1" )
+                             .select ("userId, key1, key2")
+                             .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
+                             
+topoBuilder.setBolt("joiner", jbolt, 1)
+            .fieldsGrouping("bolt1", "stream1", new Fields("key1") )
+            .fieldsGrouping("bolt2", "stream1", new Fields("key1") )
+            .fieldsGrouping("bolt3", "stream2", new Fields("userId") )
+            .fieldsGrouping("bolt4", "stream1", new Fields("key1") );
+```
+
+In the above example, it is possible that `bolt1`, for example, is emitting other streams also. But the join bolt 
+is only subscribing to `stream1` & `stream2` from the different bolts. `stream1` from `bolt1`, `bolt2` and `bolt4` 
+is treated as a single stream and joined against `stream2` from `bolt3`.
+
+## Limitations: 
+1. Currently only INNER and LEFT joins are supported. 
+
+2. Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used
+   on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the 
+   FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields 
+   can be combined into one field and then sent to the Join bolt.  
+
+
+## Tips:
+
+1. Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window
+   length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent
+    joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.
+
+2. Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist
+   across multiple windows when using Sliding Windows.
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/docs/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/Windowing.md b/docs/Windowing.md
index 18f6525..cc4dfe4 100644
--- a/docs/Windowing.md
+++ b/docs/Windowing.md
@@ -250,7 +250,7 @@ e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `
 
 The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
 
-## Guarentees
+## Guarantees
 The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
 `execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream
 bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. 

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 7f7f8fe..44f1c5a 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -83,6 +83,7 @@ But small change will not affect the user experience. We will notify the user wh
 * [Metrics](Metrics.html)
 * [State Checkpointing](State-checkpointing.html)
 * [Windowing](Windowing.html)
+* [Joining Streams](Joins.html)
 * [Blobstore(Distcahce)](distcache-blobstore.html)
 
 ### Debugging

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
new file mode 100644
index 0000000..ec9b009
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.bolt.JoinBolt;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.LocalCluster;
+
+import java.util.concurrent.TimeUnit;
+
+public class JoinBoltExample {
+    public static void main(String[] args) {
+
+        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("genderSpout", genderSpout);
+        builder.setSpout("ageSpout", ageSpout);
+
+        // inner join of 'age' and 'gender' records on 'id' field
+        JoinBolt joiner = new JoinBolt("genderSpout", "id")
+                                 .join("ageSpout",    "id", "genderSpout")
+                                 .select ("genderSpout:id,ageSpout:id,gender,age")
+                .withTumblingWindow( new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS) );
+
+        builder.setBolt("joiner", joiner)
+                .fieldsGrouping("genderSpout", new Fields("id"))
+                .fieldsGrouping("ageSpout", new Fields("id"))         ;
+
+        builder.setBolt("printer", new PrinterBolt() ).shuffleGrouping("joiner");
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("join-example", conf, builder.createTopology());
+
+        generateGenderData(genderSpout);
+
+        generateAgeData(ageSpout);
+
+        Utils.sleep(30000);
+        cluster.shutdown();
+    }
+
+    private static void generateAgeData(FeederSpout ageSpout) {
+        for (int i = 9; i >= 0; i--) {
+            ageSpout.feed(new Values(i, i + 20));
+        }
+    }
+
+    private static void generateGenderData(FeederSpout genderSpout) {
+        for (int i = 0; i < 10; i++) {
+            String gender;
+            if (i % 2 == 0) {
+                gender = "male";
+            }
+            else {
+                gender = "female";
+            }
+            genderSpout.feed(new Values(i, gender));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
index b153372..bba4ae8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -26,6 +26,10 @@ import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.starter.bolt.SingleJoinBolt;
 
+/** Example of using a simple custom join bolt
+ *  NOTE: Prefer to use the built-in JoinBolt wherever applicable
+ */
+
 public class SingleJoinExample {
   public static void main(String[] args) {
     FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
index 163c0f2..edc742a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java
@@ -29,6 +29,10 @@ import org.apache.storm.utils.TimeCacheMap;
 
 import java.util.*;
 
+/** Example of a simple custom bolt for joining two streams
+ *  NOTE: Prefer to use the built-in JoinBolt wherever applicable
+ */
+
 public class SingleJoinBolt extends BaseRichBolt {
   OutputCollector _collector;
   Fields _idFields;

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
new file mode 100644
index 0000000..b9f4164
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/bolt/JoinBolt.java
@@ -0,0 +1,593 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.bolt;
+
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JoinBolt extends BaseWindowedBolt {
+
+    private OutputCollector collector;
+
+    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
+    HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
+
+    // Map[StreamName -> JoinInfo]
+    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
+    protected FieldSelector[] outputFields;  // specified via bolt.select() ... used in declaring Output fields
+//    protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
+    protected String outputStreamName;
+
+    // Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
+    public enum Selector { STREAM, SOURCE }
+    protected final Selector selectorType;
+
+
+    /**
+     * Calls  JoinBolt(Selector.SOURCE, sourceId, fieldName)
+     * @param sourceId   Id of source component (spout/bolt) from which this bolt is receiving data
+     * @param fieldName  the field to use for joining the stream (x.y.z format)
+     */
+    public JoinBolt(String sourceId, String fieldName) {
+        this(Selector.SOURCE, sourceId, fieldName);
+    }
+    /**
+     *
+     * Introduces the first stream to start the join with. Equivalent SQL ...
+     *       select .... from srcOrStreamId ...
+     * @param type Specifies whether 'srcOrStreamId' refers to stream name/source component
+     * @param srcOrStreamId name of stream OR source component
+     * @param fieldName the field to use for joining the stream (x.y.z format)
+     */
+    public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
+        selectorType = type;
+
+        joinCriteria.put(srcOrStreamId, new JoinInfo(  new FieldSelector( srcOrStreamId, fieldName) ) );
+    }
+
+    /**
+     * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on
+     * 'default' stream.
+     */
+    public JoinBolt withOutputStream(String streamName) {
+        this.outputStreamName = streamName;
+        return this;
+    }
+
+    /**
+     * Performs inner Join with the newStream.
+     *  SQL    :   from priorStream inner join newStream on newStream.field = priorStream.field1
+     *  same as:   new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
+     *
+     *  Note: priorStream must be previously joined.
+     *    Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
+     *    Invalid ex:  new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
+     *
+     *    @param newStream  Either stream name or name of upstream component
+     *    @param field the field on which to perform the join
+     */
+    public JoinBolt join(String newStream, String field, String priorStream) {
+        return joinCommon(newStream, field, priorStream, JoinType.INNER);
+    }
+
+    /**
+     * Performs left Join with the newStream.
+     *  SQL    :   from stream1  left join stream2  on stream2.field = stream1.field1
+     *  same as:   new  WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
+     *
+     *  Note: priorStream must be previously joined
+     *    Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
+     *    Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
+     *
+     *    @param newStream  Either a name of a stream or an upstream component
+     *    @param field the field on which to perform the join
+     */
+    public JoinBolt leftJoin(String newStream, String field, String priorStream) {
+        return joinCommon(newStream, field, priorStream, JoinType.LEFT);
+    }
+
+    private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
+        if (hashedInputs.containsKey(newStream)) {
+            throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
+        }
+        hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
+        JoinInfo joinInfo = joinCriteria.get(priorStream);
+        if( joinInfo==null )
+            throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
+
+        FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
+        joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType) );
+        return this;
+    }
+
+    /**
+     * Specify projection fields. i.e. Specifies the fields to include in the output.
+     *      e.g: .select("field1, stream2:field2, field3")
+     * Nested Key names are supported for nested types:
+     *      e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)"
+     * Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation
+     * This selected fields implicitly declare the output fieldNames for the bolt based.
+     * @param commaSeparatedKeys
+     * @return
+     */
+    public JoinBolt select(String commaSeparatedKeys) {
+        String[] fieldNames = commaSeparatedKeys.split(",");
+
+        outputFields = new FieldSelector[fieldNames.length];
+        for (int i = 0; i < fieldNames.length; i++) {
+            outputFields[i] = new FieldSelector(fieldNames[i]);
+        }
+        return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        String[] outputFieldNames = new String[outputFields.length];
+        for( int i=0; i<outputFields.length; ++i ) {
+            outputFieldNames[i] = outputFields[i].getOutputName() ;
+        }
+        if (outputStreamName!=null) {
+            declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
+        } else {
+            declarer.declare(new Fields(outputFieldNames));
+        }
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        // initialize the hashedInputs data structure
+        int i=0;
+        for ( String stream : joinCriteria.keySet() ) {
+            if(i>0) {
+                hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
+            }
+            ++i;
+        }
+        if(outputFields ==null) {
+            throw new IllegalArgumentException("Must specify output fields via .select() method.");
+        }
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+        // 1) Perform Join
+        List<Tuple> currentWindow = inputWindow.get();
+        JoinAccumulator joinResult = hashJoin(currentWindow);
+
+        // 2) Emit results
+        for (ResultRecord resultRecord : joinResult.getRecords()) {
+            ArrayList<Object> outputTuple = resultRecord.getOutputFields();
+            if ( outputStreamName==null )
+                collector.emit( outputTuple );
+            else
+                collector.emit( outputStreamName, outputTuple );
+        }
+    }
+
+    private void clearHashedInputs() {
+        for (HashMap<Object, ArrayList<Tuple>> mappings : hashedInputs.values()) {
+            mappings.clear();
+        }
+    }
+
+    protected JoinAccumulator hashJoin(List<Tuple> tuples) {
+        clearHashedInputs();
+
+        JoinAccumulator probe = new JoinAccumulator();
+
+        // 1) Build phase - Segregate tuples in the Window into streams.
+        //    First stream's tuples go into probe, rest into HashMaps in hashedInputs
+        String firstStream = joinCriteria.keySet().iterator().next();
+        for (Tuple tuple : tuples) {
+            String streamId = getStreamSelector(tuple);
+            if ( ! streamId.equals(firstStream) ) {
+                Object field = getJoinField(streamId, tuple);
+                ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
+                if(recs == null) {
+                    recs = new ArrayList<Tuple>();
+                    hashedInputs.get(streamId).put(field, recs);
+                }
+                recs.add(tuple);
+
+            }  else {
+                ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
+                probe.insert( probeRecord );  // first stream's data goes into the probe
+            }
+        }
+
+        // 2) Join the streams in order of streamJoinOrder
+        int i=0;
+        for (String streamName : joinCriteria.keySet() ) {
+            boolean finalJoin = (i==joinCriteria.size()-1);
+            if(i>0) {
+                probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
+            }
+            ++i;
+        }
+
+
+        return probe;
+    }
+
+    // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
+    protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        final JoinType joinType = joinInfo.getJoinType();
+        switch ( joinType ) {
+            case INNER:
+                return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
+            case LEFT:
+                return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
+            case RIGHT:
+            case OUTER:
+            default:
+                throw new RuntimeException("Unsupported join type : " + joinType.name() );
+        }
+    }
+
+    // inner join - core implementation
+    protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        String[] probeKeyName = joinInfo.getOtherField();
+        JoinAccumulator result = new JoinAccumulator();
+        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+        for (ResultRecord rec : probe.getRecords()) {
+            Object probeKey = rec.getField(fieldSelector);
+            if (probeKey!=null) {
+                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
+                if(matchingBuildRecs!=null) {
+                    for (Tuple matchingRec : matchingBuildRecs) {
+                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+                        result.insert(mergedRecord);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    // left join - core implementation
+    protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
+        String[] probeKeyName = joinInfo.getOtherField();
+        JoinAccumulator result = new JoinAccumulator();
+        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
+        for (ResultRecord rec : probe.getRecords()) {
+            Object probeKey = rec.getField(fieldSelector);
+            if (probeKey!=null) {
+                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
+                if (matchingBuildRecs!=null && !matchingBuildRecs.isEmpty() ) {
+                    for (Tuple matchingRec : matchingBuildRecs) {
+                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
+                        result.insert(mergedRecord);
+                    }
+                } else {
+                    ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
+                    result.insert(mergedRecord);
+                }
+
+            }
+        }
+        return result;
+    }
+
+
+    // Identify the join field for the stream, and look it up in 'tuple'. field can be nested field:  outerKey.innerKey
+    private Object getJoinField(String streamId, Tuple tuple) {
+        JoinInfo ji = joinCriteria.get(streamId);
+        if(ji==null) {
+            throw new RuntimeException("Join information for '" + streamId + "' not found. Check the join clauses.");
+        }
+        return lookupField(ji.getJoinField(), tuple);
+    }
+
+    // Returns either the source component name or the stream name for the tuple
+    private String getStreamSelector(Tuple ti) {
+        switch (selectorType) {
+            case STREAM:
+                return ti.getSourceStreamId();
+            case SOURCE:
+                return ti.getSourceComponent();
+            default:
+                throw new RuntimeException(selectorType + " stream selector type not yet supported");
+        }
+    }
+
+
+    protected enum JoinType {INNER, LEFT, RIGHT, OUTER}
+
+    /** Describes how to join the other stream with the current stream */
+    protected static class JoinInfo implements Serializable {
+        final static long serialVersionUID = 1L;
+
+        private JoinType joinType;        // nature of join
+        private FieldSelector field;           // field for the current stream
+        private FieldSelector other;      // field for the other (2nd) stream
+
+
+        public JoinInfo(FieldSelector field) {
+            this.joinType = null;
+            this.field = field;
+            this.other = null;
+        }
+        public JoinInfo(FieldSelector field, String otherStream, JoinInfo otherStreamJoinInfo,  JoinType joinType) {
+            this.joinType = joinType;
+            this.field = field;
+            this.other = new FieldSelector(otherStream, otherStreamJoinInfo.field.getOutputName() );
+        }
+
+        public FieldSelector getJoinField() {
+            return field;
+        }
+
+        public String getOtherStream() {
+            return other.getStreamName();
+        }
+
+        public String[] getOtherField() {
+            return other.getField();
+        }
+
+        public JoinType getJoinType() {
+            return joinType;
+        }
+
+    } // class JoinInfo
+
+    // Join helper to concat fields to the record
+    protected class ResultRecord {
+
+        ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
+        ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
+
+        // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
+        public ResultRecord(Tuple tuple, boolean generateOutputFields) {
+            tupleList.add(tuple);
+            if(generateOutputFields) {
+                outFields = doProjection(tupleList, outputFields);
+            }
+        }
+
+        public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
+            if(lhs!=null)
+                tupleList.addAll(lhs.tupleList);
+            if(rhs!=null)
+                tupleList.add(rhs);
+            if(generateOutputFields) {
+                outFields = doProjection(tupleList, outputFields);
+            }
+        }
+
+        public ArrayList<Object> getOutputFields() {
+            return outFields;
+        }
+
+
+        // 'stream' cannot be null,
+        public Object getField(FieldSelector fieldSelector) {
+            for (Tuple tuple : tupleList) {
+                Object result = lookupField(fieldSelector, tuple);
+                if (result!=null)
+                    return result;
+            }
+            return null;
+        }
+    }
+
+    protected class JoinAccumulator {
+        ArrayList<ResultRecord> records = new ArrayList<>();
+
+        public void insert(ResultRecord tuple) {
+            records.add( tuple );
+        }
+
+        public Collection<ResultRecord> getRecords() {
+            return records;
+        }
+    }
+
+    // Performs projection on the tuples based on 'projectionFields'
+    protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
+        ArrayList<Object> result = new ArrayList<>(projectionFields.length);
+        // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
+        for ( int i = 0; i < projectionFields.length; i++ ) {
+            boolean missingField = true;
+            for ( Tuple tuple : tuples ) {
+                Object field = lookupField(projectionFields[i], tuple ) ;
+                if (field != null) {
+                    result.add(field);
+                    missingField=false;
+                    break;
+                }
+            }
+            if(missingField) { // add a null for missing fields (usually in case of outer joins)
+                result.add(null);
+            }
+        }
+        return result;
+    }
+
+    protected static class FieldSelector implements Serializable {
+        String streamName;    // can be null;
+        String[] field;       // nested field "x.y.z"  becomes => String["x","y","z"]
+        String outputName;    // either "stream1:x.y.z" or "x.y.z" depending on whether stream name is present.
+
+        public FieldSelector(String fieldDescriptor)  {  // sample fieldDescriptor = "stream1:x.y.z"
+            int pos = fieldDescriptor.indexOf(':');
+
+            if (pos>0) {  // stream name is specified
+                streamName = fieldDescriptor.substring(0,pos).trim();
+                outputName = fieldDescriptor.trim();
+                field =  fieldDescriptor.substring(pos+1, fieldDescriptor.length()).split("\\.");
+                return;
+            }
+
+            // stream name unspecified
+            streamName = null;
+            if(pos==0) {
+                outputName = fieldDescriptor.substring(1, fieldDescriptor.length() ).trim();
+
+            } else if (pos<0) {
+                outputName = fieldDescriptor.trim();
+            }
+            field =  outputName.split("\\.");
+        }
+
+        /**
+         * @param stream name of stream
+         * @param fieldDescriptor  Simple fieldDescriptor like "x.y.z" and w/o a 'stream1:' stream qualifier.
+         */
+        public FieldSelector(String stream, String fieldDescriptor)  {
+            this(fieldDescriptor);
+            if(fieldDescriptor.indexOf(":")>=0) {
+                throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + fieldDescriptor
+                        + "'. Stream name '" + stream +  "' is implicit in this context");
+            }
+            this.streamName = stream;
+        }
+
+        public FieldSelector(String stream, String[] field)  {
+            this( stream, String.join(".", field) );
+        }
+
+
+        public String getStreamName() {
+            return streamName;
+        }
+
+        public String[] getField() {
+            return field;
+        }
+
+        public String getOutputName() {
+            return toString();
+        }
+
+        @Override
+        public String toString() {
+            return outputName;
+        }
+    }
+
+    // Extract the field from tuple. Field may be nested field (x.y.z)
+    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
+
+        // very stream name matches, it stream name was specified
+        if ( fieldSelector.streamName!=null &&
+                !fieldSelector.streamName.equalsIgnoreCase( getStreamSelector(tuple) ) ) {
+            return null;
+        }
+
+        Object curr = null;
+        for (int i=0; i < fieldSelector.field.length; i++) {
+            if (i==0) {
+                if (tuple.contains(fieldSelector.field[i]) )
+                    curr = tuple.getValueByField(fieldSelector.field[i]);
+                else
+                    return null;
+            }  else  {
+                curr = ((Map) curr).get(fieldSelector.field[i]);
+                if (curr==null)
+                    return null;
+            }
+        }
+        return curr;
+    }
+
+    // Boilerplate overrides to cast result from base type to JoinBolt, so user doesn't have to
+    // down cast when invoking these methods
+
+    @Override
+    public JoinBolt withWindow(Count windowLength, Count slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Count windowLength, Duration slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength, Count slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength, Duration slidingInterval) {
+        return (JoinBolt) super.withWindow(windowLength, slidingInterval);
+    }
+
+    @Override
+    public JoinBolt withWindow(Count windowLength) {
+        return (JoinBolt) super.withWindow(windowLength);
+    }
+
+    @Override
+    public JoinBolt withWindow(Duration windowLength) {
+        return (JoinBolt) super.withWindow(windowLength);
+    }
+
+    @Override
+    public JoinBolt withTumblingWindow(Count count) {
+        return (JoinBolt) super.withTumblingWindow(count);
+    }
+
+    @Override
+    public JoinBolt withTumblingWindow(Duration duration) {
+        return (JoinBolt) super.withTumblingWindow(duration);
+    }
+
+    @Override
+    public JoinBolt withTimestampField(String fieldName) {
+        return (JoinBolt) super.withTimestampField(fieldName);
+    }
+
+    @Override
+    public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+        return (JoinBolt) super.withTimestampExtractor(timestampExtractor);
+    }
+
+    @Override
+    public JoinBolt withLateTupleStream(String streamId) {
+        return (JoinBolt) super.withLateTupleStream(streamId);
+    }
+
+    @Override
+    public BaseWindowedBolt withLag(Duration duration) {
+        return (JoinBolt) super.withLag(duration);
+    }
+
+    @Override
+    public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+        return (JoinBolt) super.withWatermarkInterval(interval);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f4ded162/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
new file mode 100644
index 0000000..63c4a7f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.bolt;
+
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.windowing.TupleWindow;
+import org.apache.storm.windowing.TupleWindowImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestJoinBolt {
+    String[] userFields = {"userId", "name", "city"};
+    Object[][] users = {
+            {1, "roshan", "san jose" },
+            {2, "harsha", "santa clara" },
+            {3, "siva",   "dublin" },
+            {4, "hugo",   "san mateo" },
+            {5, "suresh", "sunnyvale" },
+            {6, "guru",   "palo alto" },
+            {7, "arun",   "bengaluru"},
+            {8, "satish", "mumbai" },
+            {9, "mani",   "bengaluru" },
+            {10,"priyank","seattle" }
+    };
+
+    String[] orderFields = {"orderId", "userId", "itemId", "price"};
+
+    Object[][] orders = {
+            {11, 2, 21, 7},
+            {12, 2, 22, 3},
+            {13, 3, 23, 4},
+            {14, 4, 24, 5},
+            {15, 5, 25, 2},
+            {16, 6, 26, 7},
+            {17, 6, 27, 4},
+            {18, 7, 28, 2},
+            {19, 8, 29, 9}
+    };
+
+    String[] storeFields = {"storeId", "storeName", "city"};
+    Object[][] stores = {
+            {1, "store1",  "san jose"},
+            {2, "store2",  "santa clara"},
+            {3, "store3",  "dublin" },
+            {4, "store4",  "san mateo" },
+            {5, "store5",  "bengaluru" },
+    };
+
+    String [] cityFields = {"cityId","cityName","country"};
+    Object[][] cities = {
+            {1, "san jose", "US"},
+            {2, "santa clara", "US"},
+            {3, "dublin", "US" },
+            {4, "san mateo", "US" },
+            {5, "sunnyvale", "US" },
+            {6, "palo alto", "US" },
+            {7, "bengaluru", "India"},
+            {8, "mumbai", "India"},
+            {9, "chennai", "India"}
+    };
+
+
+    @Test
+    public void testTrivial() throws Exception {
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        TupleWindow window = makeTupleWindow(orderStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "orders", orderFields[0])
+                .select("orderId,userId,itemId,price");
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals( orderStream.size(), collector.actualResults.size() );
+    }
+
+    @Test
+    public void testNestedKeys() throws Exception {
+        ArrayList<Tuple> userStream = makeNestedEventsStream("users", userFields, users);
+        TupleWindow window = makeTupleWindow(userStream);
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", "outer.userId")
+                .select("outer.name, outer.city");
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals( userStream.size(), collector.actualResults.size() );
+    }
+
+
+    @Test
+    public void testProjection_FieldsWithStreamName() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> storeStream = makeStream("stores", storeFields, stores);
+
+        TupleWindow window = makeTupleWindow(storeStream, userStream);
+
+        // join users and stores on city name
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[2])
+                .join("stores", "city", "users")
+                .select("userId,name,storeName,users:city,stores:city");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals( storeStream.size()+1, collector.actualResults.size() );
+        // ensure 5 fields per tuple and no null fields
+        for (List<Object> tuple : collector.actualResults) {
+            Assert.assertEquals(5, tuple.size());
+            for (Object o : tuple) {
+                Assert.assertNotNull(o);
+            }
+        }
+    }
+
+    @Test
+    public void testInnerJoin() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        TupleWindow window = makeTupleWindow(orderStream, userStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
+                .join("orders", "userId", "users")
+                .select("userId,name,price");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals( orders.length, collector.actualResults.size() );
+    }
+
+    @Test
+    public void testLeftJoin() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        TupleWindow window = makeTupleWindow(orderStream, userStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
+                .leftJoin("orders", "userId", "users")
+                .select("userId,name,price");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals(12, collector.actualResults.size() );
+    }
+
+    @Test
+    public void testThreeStreamInnerJoin() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+
+        TupleWindow window = makeTupleWindow(userStream, storesStream, cityStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[2])
+                .join("stores", "city", "users")
+                .join("cities", "cityName", "stores")
+                .select("name,storeName,city,country");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals(6, collector.actualResults.size() );
+
+    }
+
+    @Test
+    public void testThreeStreamLeftJoin_1() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+
+        TupleWindow window = makeTupleWindow(userStream,  cityStream, storesStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[2])
+                .leftJoin("stores", "city", "users")
+                .leftJoin("cities", "cityName", "users")
+                .select("name,storeName,city,country");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals(users.length, collector.actualResults.size() );
+    }
+
+    @Test
+    public void testThreeStreamLeftJoin_2() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+
+        TupleWindow window = makeTupleWindow(userStream, cityStream, storesStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", "city")
+                .leftJoin("stores", "city", "users")
+                .leftJoin("cities", "cityName", "stores")  // join against diff stream compared to testThreeStreamLeftJoin_1
+                .select("name,storeName,city,country");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals(stores.length+1, collector.actualResults.size() ); // stores.length+1 as 2 users in Bengaluru
+    }
+
+
+
+    @Test
+    public void testThreeStreamMixedJoin() throws Exception {
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+
+        TupleWindow window = makeTupleWindow(userStream,  cityStream, storesStream);
+
+        JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[2])
+                .join("stores", "city", "users")
+                .leftJoin("cities", "cityName", "users")
+                .select("name,storeName,city,country");
+
+        MockCollector collector = new MockCollector();
+        bolt.prepare(null, null, collector);
+        bolt.execute(window);
+        printResults(collector);
+        Assert.assertEquals(stores.length+1, collector.actualResults.size() ); // stores.length+1 as 2 users in Bengaluru
+    }
+
+    private static void printResults(MockCollector collector) {
+        int counter=0;
+        for (List<Object> rec : collector.actualResults) {
+            System.out.print(++counter +  ") ");
+            for (Object field : rec) {
+                System.out.print(field + ", ");
+            }
+            System.out.println("");
+        }
+    }
+
+
+    private static TupleWindow makeTupleWindow(ArrayList<Tuple> stream) {
+        return new TupleWindowImpl(stream, null, null);
+    }
+
+
+    private static TupleWindow makeTupleWindow(ArrayList<Tuple>... streams) {
+        ArrayList<Tuple> combined = null;
+        for (int i = 0; i < streams.length; i++) {
+            if(i==0) {
+                combined = new ArrayList<>(streams[0]);
+            } else {
+                combined.addAll(streams[i]);
+            }
+        }
+        return new TupleWindowImpl(combined, null, null);
+    }
+
+
+    private static ArrayList<Tuple> makeStream(String streamName, String[] fieldNames, Object[][] data) {
+        ArrayList<Tuple> result = new ArrayList<>();
+        MockContext mockContext = new MockContext(fieldNames);
+
+        for (Object[] record : data) {
+            TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), 0, streamName);
+            result.add( rec );
+        }
+
+        return result;
+    }
+
+
+    private static ArrayList<Tuple> makeNestedEventsStream (String streamName, String[] fieldNames, Object[][] records) {
+
+        MockContext mockContext = new MockContext(new String[]{"outer"} );
+        ArrayList<Tuple> result = new ArrayList<>(records.length);
+
+        // convert each record into a HashMap using fieldNames as keys
+        for (Object[] record : records) {
+            HashMap<String,Object> recordMap = new HashMap<>( fieldNames.length );
+            for (int i = 0; i < fieldNames.length; i++) {
+                recordMap.put(fieldNames[i], record[i]);
+            }
+
+            ArrayList<Object> tupleValues = new ArrayList<>(1);
+            tupleValues.add(recordMap);
+            TupleImpl tuple = new TupleImpl(mockContext, tupleValues, 0, streamName);
+            result.add( tuple );
+        }
+
+        return result;
+    }
+
+
+    static class MockCollector extends OutputCollector {
+        public ArrayList<List<Object> > actualResults = new ArrayList<>();
+
+        public MockCollector() {
+            super(null);
+        }
+
+        @Override
+        public List<Integer> emit(List<Object> tuple) {
+            actualResults.add(tuple);
+            return null;
+        }
+
+    } // class MockCollector
+
+    static class MockContext extends GeneralTopologyContext {
+
+        private final Fields fields;
+
+        public MockContext(String[] fieldNames) {
+            super(null, null, null, null, null, null);
+            this.fields = new Fields(fieldNames);
+        }
+
+        public String getComponentId(int taskId) {
+            return "component";
+        }
+
+        public Fields getComponentOutputFields(String componentId, String streamId) {
+            return fields;
+        }
+
+    }    
+}


[2/2] storm git commit: Added STORM-2334 to CHANGELOG.md

Posted by ar...@apache.org.
Added STORM-2334 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e01f2282
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e01f2282
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e01f2282

Branch: refs/heads/1.x-branch
Commit: e01f228279eefaaeeda7e6fb6f274ee6beb3a3b8
Parents: f4ded16
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Feb 24 12:55:45 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Fri Feb 24 12:55:45 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e01f2282/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7be2ca2..96f0a66 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 1.1.1
+* STORM-2334: Join Bolt implementation
 * STORM-1363: TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple()
 
 ## 1.1.0