You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/27 13:26:23 UTC

[1/2] storm git commit: STORM-1659:Add documents for external projects

Repository: storm
Updated Branches:
  refs/heads/master f118060dc -> a48fae243


STORM-1659:Add documents for external projects


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a00c671
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a00c671
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a00c671

Branch: refs/heads/master
Commit: 0a00c67170db83da3f5c29dcecf6f185fd066b9b
Parents: f118060
Author: Xin Wang <be...@163.com>
Authored: Sun Mar 27 16:04:38 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Mar 27 16:10:53 2016 +0800

----------------------------------------------------------------------
 docs/Kestrel-and-Storm.md   |   2 +-
 docs/index.md               |  14 +-
 docs/storm-cassandra.md     | 255 ++++++++++++++++++++++++++
 docs/storm-elasticsearch.md | 105 +++++++++++
 docs/storm-mongodb.md       | 199 ++++++++++++++++++++
 docs/storm-mqtt.md          | 379 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 948 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/Kestrel-and-Storm.md
----------------------------------------------------------------------
diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md
index cd584ff..ff48995 100644
--- a/docs/Kestrel-and-Storm.md
+++ b/docs/Kestrel-and-Storm.md
@@ -3,7 +3,7 @@ title: Storm and Kestrel
 layout: documentation
 documentation: true
 ---
-This page explains how to use to Storm to consume items from a Kestrel cluster.
+This page explains how to use Storm to consume items from a Kestrel cluster.
 
 ## Preliminaries
 ### Storm

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 8c6859e..bfd68db 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
 * [Worker Profiling](dynamic-worker-profiling.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Event Hubs Intergration](storm-eventhubs.html)
+* [Apache Kafka Integration](storm-kafka.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)
+* [Apache Solr Integration](storm-solr.html)
 * [JDBC Integration](storm-jdbc.html)
-* [Apache Kafka Integration](storm-kafka.html)
-* [REDIS Integration](storm-redis.html) 
-* [Kestrel and Storm](Kestrel-and-Storm.html)
-* [Solr Integration](storm-solr.html)
+* [Redis Integration](storm-redis.html) 
+* [Cassandra Integration](storm-cassandra.html)
+* [Event Hubs Intergration](storm-eventhubs.html)
+* [Elasticsearch Integration](storm-elasticsearch.html)
+* [MQTT Integration](storm-mqtt.html)
+* [Mongodb Integration](storm-mongodb.html)
+* [Kestrel Integration](Kestrel-and-Storm.html)
 
 ### Advanced
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
new file mode 100644
index 0000000..c674fbc
--- /dev/null
+++ b/docs/storm-cassandra.md
@@ -0,0 +1,255 @@
+---
+title: Storm Cassandra Integration
+layout: documentation
+documentation: true
+---
+
+### Bolt API implementation for Apache Cassandra
+
+This library provides core storm bolt on top of Apache Cassandra.
+Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
+
+
+### Configuration
+The following properties may be passed to storm configuration.
+
+| **Property name**                            | **Description** | **Default**         |
+| ---------------------------------------------| ----------------| --------------------|
+| **cassandra.keyspace**                       | -               |                     |
+| **cassandra.nodes**                          | -               | {"localhost"}       |
+| **cassandra.username**                       | -               | -                   |
+| **cassandra.password**                       | -               | -                   |
+| **cassandra.port**                           | -               | 9092                |
+| **cassandra.output.consistencyLevel**        | -               | ONE                 |
+| **cassandra.batch.size.rows**                | -               | 100                 |
+| **cassandra.retryPolicy**                    | -               | DefaultRetryPolicy  |
+| **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)            |
+| **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)          |
+
+### CassandraWriterBolt
+
+####Static import
+```java
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*
+
+```
+
+#### Insert Query Builder
+##### Insert query including only the specified tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(
+                    fields("title", "year", "performer", "genre", "tracks")
+                 )
+            )
+    );
+```
+
+##### Insert query including all tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with( all() )
+            )
+    );
+```
+
+##### Insert multiple queries from one input tuple.
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+```
+
+##### Insert query using QueryBuilder
+```java
+
+    new CassandraWriterBolt(
+        async(
+            simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(all()))
+            )
+    )
+```
+
+##### Insert query with static bound query
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .bind(all());
+         )
+    );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+                .bind(
+                    field("ti"),as("title"),
+                    field("ye").as("year")),
+                    field("pe").as("performer")),
+                    field("ge").as("genre")),
+                    field("tr").as("tracks"))
+                ).byNamedSetters()
+         )
+    );
+```
+
+##### Insert query with bound statement load from storm configuration
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(named("insertIntoAlbum"))
+            .bind(all());
+```
+
+##### Insert query with bound statement load from tuple field
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(namedByField("cql"))
+            .bind(all());
+```
+
+##### Insert query with batch statement
+```java
+
+    // Logged
+    new CassandraWriterBolt(loggedBatch(
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+// UnLogged
+    new CassandraWriterBolt(unLoggedBatch(
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+        )
+    );
+```
+
+### How to handle query execution results
+
+The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle.
+
+```java
+public interface ExecutionResultHandler extends Serializable {
+    void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+
+    void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+    void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+
+    void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+
+    void onQuerySuccess(OutputCollector collector, Tuple tuple);
+}
+```
+
+By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) .
+
+```java
+    new CassandraWriterBolt(insertInto("album").values(with(all()).build())
+            .withResultHandler(new MyCustomResultHandler());
+```
+
+### Declare Output fields
+
+A CassandraBolt can declare output fields / stream output fields.
+For instance, this may be used to remit a new tuple on error, or to chain queries.
+
+```java
+    new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
+            .withResultHandler(new EmitOnDriverExceptionResultHandler());
+            .withStreamOutputFields("stream_error", new Fields("message");
+
+    public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
+        @Override
+        protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+            LOG.error("An error occurred while executing cassandra statement", e);
+            collector.emit("stream_error", new Values(e.getMessage()));
+            collector.ack(tuple);
+        }
+    }
+```
+
+### Murmur3FieldGrouping
+
+[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java)  can be used to optimise cassandra writes.
+The stream is partitioned among the bolt's tasks based on the specified row partition keys.
+
+```java
+CassandraWriterBolt bolt = new CassandraWriterBolt(
+    insertInto("album")
+        .values(
+            with(fields("title", "year", "performer", "genre", "tracks")
+            ).build());
+builder.setBolt("BOLT_WRITER", bolt, 4)
+        .customGrouping("spout", new Murmur3StreamGrouping("title"))
+```
+
+### Trident API support
+storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 
+```java
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+                "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        CassandraStateFactory insertValuesStateFactory =  new CassandraStateFactory(options);
+        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+```
+
+Below `state` API for `querying` data from Cassandra.
+```java
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+                 .bind(with(field("weather_station_id").as("id")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+        CassandraStateFactory selectWeatherStationStateFactory =  new CassandraStateFactory(options);
+        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/storm-elasticsearch.md b/docs/storm-elasticsearch.md
new file mode 100644
index 0000000..fff0f51
--- /dev/null
+++ b/docs/storm-elasticsearch.md
@@ -0,0 +1,105 @@
+---
+title: Storm Elasticsearch Integration
+layout: documentation
+documentation: true
+---
+
+# Storm Elasticsearch Bolt & Trident State
+
+  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch directly.
+  For detailed description, please refer to the following.
+
+## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
+
+EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination. 
+Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and "id" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be indexed in Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
+```
+
+## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
+
+EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination. 
+User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
+```
+
+If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
+for each Percolate.Match in PercolateResponse.
+
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+
+StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+
+## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
+
+EsLookupBolt performs a get request to Elasticsearch. 
+In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
+    ElasticsearchGetRequest is used to convert the incoming Tuple to the GetRequest that will be executed against Elasticsearch.
+    EsLookupResultOutput is used to declare the output fields and convert the GetResponse to values that are emited by the bolt.
+
+Incoming tuple is passed to provided GetRequest creator and the result of that execution is passed to Elasticsearch client.
+The bolt then uses the provider output adapter (EsLookupResultOutput) to convert the GetResponse to Values to emit.
+The output fields are also specified by the user of the bolt via the output adapter (EsLookupResultOutput).
+
+```java
+EsConfig esConfig = createEsConfig();
+ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
+EsLookupResultOutput output = createOutput();
+EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
+```
+
+## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+  
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+```
+
+or
+
+```java
+Map<String, String> additionalParameters = new HashMap<>();
+additionalParameters.put("client.transport.sniff", "true");
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
+```
+
+### EsConfig params
+
+|Arg  |Description | Type
+|---	|--- |---
+|clusterName | Elasticsearch cluster name | String (required) |
+|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+|additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map<String, String> (optional) |
+
+## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
+
+For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define which fields are used for.
+Users need to define your own by implementing ```EsTupleMapper```.
+Storm-elasticsearch presents default mapper ```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which extracts its source, index, type, id values from identical fields.
+You can refer implementation of DefaultEsTupleMapper to see how to implement your own.
+  
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
new file mode 100644
index 0000000..90994bd
--- /dev/null
+++ b/docs/storm-mongodb.md
@@ -0,0 +1,199 @@
+---
+title: Storm MongoDB Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+    Document toDocument(ITuple tuple);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+        .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+### MongoTridentState
+We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+
+ 
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+    Bson createFilter(ITuple tuple);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+ 
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+ 
+  ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+            @Override
+            public Bson createFilter(ITuple tuple) {
+                return Filters.gt("count", 3);
+            }
+        };
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mqtt.md
----------------------------------------------------------------------
diff --git a/docs/storm-mqtt.md b/docs/storm-mqtt.md
new file mode 100644
index 0000000..b730242
--- /dev/null
+++ b/docs/storm-mqtt.md
@@ -0,0 +1,379 @@
+---
+title: Storm MQTT Integration
+layout: documentation
+documentation: true
+---
+
+## About
+
+MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.
+
+Further information can be found at http://mqtt.org. The HiveMQ website has a great series on 
+[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
+
+Features include:
+
+* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
+* Spout implementation(s) for subscribing to MQTT topics
+* A bolt implementation for publishing MQTT messages
+* A trident function implementation for publishing MQTT messages
+* Authentication and TLS/SSL support
+* User-defined "mappers" for converting MQTT messages to tuples (subscribers)
+* User-defined "mappers" for converting tuples to MQTT messages (publishers)
+
+
+## Quick Start
+To quickly see MQTT integration in action, follow the instructions below.
+
+**Start a MQTT broker and publisher**
+
+The command below will create an MQTT broker on port 1883, and start a publsher that will publish random 
+temperature/humidity values to an MQTT topic.
+
+Open a terminal and execute the following command (change the path as necessary):
+
+```bash
+java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher
+```
+
+**Run the example toplogy**
+
+Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout
+publishing to a bolt that simply logs the information it receives.
+
+In a separate terminal, run the following command (Note that the `storm` executable must be on your PATH):
+
+```bash
+storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
+```
+
+You should see data from MQTT being logged by the bolt:
+
+```
+27020 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
+27030 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
+27040 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
+27049 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
+27059 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
+27069 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
+```
+
+Either allow the local cluster to exit, or stop it by typing Cntrl-C.
+
+**MQTT Fault Tolerance In Action**
+
+After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker,
+and it will continue to receive and queue messages (as long as the broker is running).
+
+If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker,
+it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a 
+rate of about two messages per second.
+
+This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that
+the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the 
+`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*.
+
+For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below.
+
+
+
+## Delivery Guarantees
+In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as
+well as the MQTT spout.
+
+The MQTT protocol defines the following QoS levels:
+
+* `0` - At Most Once (AKA "Fire and Forget")
+* `1` - At Least Once
+* `2` - Exactly Once
+
+This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being 
+completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects 
+and embraces the concept of node failure.
+
+So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure.
+
+###Recommendations
+
+*You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide 
+transational semantics. You will only get at least once guarantees.*
+
+If you need reliability guarantees (i.e. *at least once processing*):
+
+1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` so the broker saves messages if/when the 
+spout is offline.
+2. Use the spout defaults (`cleanSession = false` and `qos = 1`)
+3. If you can, make sure any result of receiving and MQTT message is idempotent.
+4. Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and 
+man-made disasters and network partitions. Incineration and destruction happens.
+
+
+
+
+
+## Configuration
+For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`.
+
+### Message Mappers
+To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the 
+`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:
+
+```java
+public interface MqttMessageMapper extends Serializable {
+
+    Values toValues(MqttMessage message);
+
+    Fields outputFields();
+}
+```
+
+The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload 
+(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both
+the message topic and payload:
+
+```java
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+    public Values toValues(MqttMessage message) {
+        String topic = message.getTopic();
+        String[] topicElements = topic.split("/");
+        String[] payloadElements = new String(message.getMessage()).split("/");
+
+        return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), 
+                Float.parseFloat(payloadElements[1]));
+    }
+
+    public Fields outputFields() {
+        return new Fields("user", "deviceId", "location", "temperature", "humidity");
+    }
+}
+```
+
+### Tuple Mappers
+When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages 
+(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface:
+
+```java
+public interface MqttTupleMapper extends Serializable{
+
+    MqttMessage toMessage(ITuple tuple);
+
+}
+```
+
+For example, a simple `MqttTupleMapper` implementation might look like this:
+
+```java
+public class MyTupleMapper implements MqttTupleMapper {
+    public MqttMessage toMessage(ITuple tuple) {
+        String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device");
+        byte[] payload = tuple.getStringByField("message").getBytes();
+        return new MqttMessage(topic, payload);
+    }
+}
+```
+
+### MQTT Spout Parallelism
+It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances
+of the spout subscribed to the same topic(s), resulting in duplicate messages.
+
+If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy 
+and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined 
+by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do 
+something like the following:
+
+```java
+String spout1Topic = "users/east/#";
+String spout2Topic = "users/west/#";
+```
+
+and then join the resulting streams together by subscribing a bolt to each stream.
+
+
+### Using Flux
+
+The following Flux YAML configuration creates the toplolgy used in the example:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "tcp://localhost:1883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+
+```
+
+
+### Using Java
+
+Similarly, you can create the same topology using the Storm Core Java API:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+MqttOptions options = new MqttOptions();
+options.setTopics(Arrays.asList("/users/tgoetz/#"));
+options.setCleanConnection(false);
+MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
+
+MqttBolt bolt = new LogInfoBolt();
+
+builder.setSpout("mqtt-spout", spout);
+builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
+
+return builder.createTopology();
+```
+
+## SSL/TLS
+If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout 
+with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.
+
+### SSL/TLS URIs
+To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over
+the algorithm, you can specify a specific protocol:
+
+ * `ssl://` Use the JVM default version of the SSL protocol.
+ * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`)
+ * `tls://` Use the JVM default version of the TLS protocol.
+ * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`)
+ 
+ 
+### Specifying Keystore/Truststore Locations
+ 
+ The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that
+ is used to load the certificates required for TLS/SSL connections. For example:
+ 
+```java
+ public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader)
+```
+ 
+The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the 
+keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use 
+`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks");
+ksl.setKeyStorePassword("password");
+ksl.setTrustStorePassword("password");
+//...
+```
+
+If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
+ksl.setKeyStorePassword("password");
+//...
+```
+
+SSL/TLS can also be configured using Flux:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+   ########## MQTT Spout Config ############
+  - id: "mqtt-type"
+    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+  - id: "keystore-loader"
+    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+    constructorArgs:
+      - "keystore.jks"
+      - "truststore.jks"
+    properties:
+      - name: "keyPassword"
+        value: "password"
+      - name: "keyStorePassword"
+        value: "password"
+      - name: "trustStorePassword"
+        value: "password"
+
+  - id: "mqtt-options"
+    className: "org.apache.storm.mqtt.common.MqttOptions"
+    properties:
+      - name: "url"
+        value: "ssl://raspberrypi.local:8883"
+      - name: "topics"
+        value:
+          - "/users/tgoetz/#"
+
+# topology configuration
+config:
+  topology.workers: 1
+  topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+  - id: "mqtt-spout"
+    className: "org.apache.storm.mqtt.spout.MqttSpout"
+    constructorArgs:
+      - ref: "mqtt-type"
+      - ref: "mqtt-options"
+      - ref: "keystore-loader"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+
+streams:
+
+  - from: "mqtt-spout"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file


[2/2] storm git commit: Merge branch 'STORM-1659-DOCS' of https://github.com/vesense/storm into STORM-1659

Posted by ka...@apache.org.
Merge branch 'STORM-1659-DOCS' of https://github.com/vesense/storm into STORM-1659


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a48fae24
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a48fae24
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a48fae24

Branch: refs/heads/master
Commit: a48fae243d7bf4d374a6a3de8e9d3e46d9bd3b64
Parents: f118060 0a00c67
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Mar 27 20:24:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Mar 27 20:24:38 2016 +0900

----------------------------------------------------------------------
 docs/Kestrel-and-Storm.md   |   2 +-
 docs/index.md               |  14 +-
 docs/storm-cassandra.md     | 255 ++++++++++++++++++++++++++
 docs/storm-elasticsearch.md | 105 +++++++++++
 docs/storm-mongodb.md       | 199 ++++++++++++++++++++
 docs/storm-mqtt.md          | 379 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 948 insertions(+), 6 deletions(-)
----------------------------------------------------------------------