You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/27 13:26:23 UTC
[1/2] storm git commit: STORM-1659:Add documents for external projects
Repository: storm
Updated Branches:
refs/heads/master f118060dc -> a48fae243
STORM-1659:Add documents for external projects
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a00c671
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a00c671
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a00c671
Branch: refs/heads/master
Commit: 0a00c67170db83da3f5c29dcecf6f185fd066b9b
Parents: f118060
Author: Xin Wang <be...@163.com>
Authored: Sun Mar 27 16:04:38 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Mar 27 16:10:53 2016 +0800
----------------------------------------------------------------------
docs/Kestrel-and-Storm.md | 2 +-
docs/index.md | 14 +-
docs/storm-cassandra.md | 255 ++++++++++++++++++++++++++
docs/storm-elasticsearch.md | 105 +++++++++++
docs/storm-mongodb.md | 199 ++++++++++++++++++++
docs/storm-mqtt.md | 379 +++++++++++++++++++++++++++++++++++++++
6 files changed, 948 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/Kestrel-and-Storm.md
----------------------------------------------------------------------
diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md
index cd584ff..ff48995 100644
--- a/docs/Kestrel-and-Storm.md
+++ b/docs/Kestrel-and-Storm.md
@@ -3,7 +3,7 @@ title: Storm and Kestrel
layout: documentation
documentation: true
---
-This page explains how to use to Storm to consume items from a Kestrel cluster.
+This page explains how to use Storm to consume items from a Kestrel cluster.
## Preliminaries
### Storm
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 8c6859e..bfd68db 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
* [Worker Profiling](dynamic-worker-profiling.html)
### Integration With External Systems, and Other Libraries
-* [Event Hubs Intergration](storm-eventhubs.html)
+* [Apache Kafka Integration](storm-kafka.html)
* [Apache HBase Integration](storm-hbase.html)
* [Apache HDFS Integration](storm-hdfs.html)
* [Apache Hive Integration](storm-hive.html)
+* [Apache Solr Integration](storm-solr.html)
* [JDBC Integration](storm-jdbc.html)
-* [Apache Kafka Integration](storm-kafka.html)
-* [REDIS Integration](storm-redis.html)
-* [Kestrel and Storm](Kestrel-and-Storm.html)
-* [Solr Integration](storm-solr.html)
+* [Redis Integration](storm-redis.html)
+* [Cassandra Integration](storm-cassandra.html)
+* [Event Hubs Intergration](storm-eventhubs.html)
+* [Elasticsearch Integration](storm-elasticsearch.html)
+* [MQTT Integration](storm-mqtt.html)
+* [Mongodb Integration](storm-mongodb.html)
+* [Kestrel Integration](Kestrel-and-Storm.html)
### Advanced
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
new file mode 100644
index 0000000..c674fbc
--- /dev/null
+++ b/docs/storm-cassandra.md
@@ -0,0 +1,255 @@
+---
+title: Storm Cassandra Integration
+layout: documentation
+documentation: true
+---
+
+### Bolt API implementation for Apache Cassandra
+
+This library provides core storm bolt on top of Apache Cassandra.
+Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
+
+
+### Configuration
+The following properties may be passed to storm configuration.
+
+| **Property name** | **Description** | **Default** |
+| ---------------------------------------------| ----------------| --------------------|
+| **cassandra.keyspace** | - | |
+| **cassandra.nodes** | - | {"localhost"} |
+| **cassandra.username** | - | - |
+| **cassandra.password** | - | - |
+| **cassandra.port** | - | 9092 |
+| **cassandra.output.consistencyLevel** | - | ONE |
+| **cassandra.batch.size.rows** | - | 100 |
+| **cassandra.retryPolicy** | - | DefaultRetryPolicy |
+| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) |
+| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) |
+
+### CassandraWriterBolt
+
+####Static import
+```java
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*
+
+```
+
+#### Insert Query Builder
+##### Insert query including only the specified tuple fields.
+```java
+
+ new CassandraWriterBolt(
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(
+ fields("title", "year", "performer", "genre", "tracks")
+ )
+ )
+ );
+```
+
+##### Insert query including all tuple fields.
+```java
+
+ new CassandraWriterBolt(
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with( all() )
+ )
+ );
+```
+
+##### Insert multiple queries from one input tuple.
+```java
+
+ new CassandraWriterBolt(
+ async(
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+ )
+ );
+```
+
+##### Insert query using QueryBuilder
+```java
+
+ new CassandraWriterBolt(
+ async(
+ simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(all()))
+ )
+ )
+```
+
+##### Insert query with static bound query
+```java
+
+ new CassandraWriterBolt(
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .bind(all());
+ )
+ );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+ new CassandraWriterBolt(
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+ .bind(
+ field("ti"),as("title"),
+ field("ye").as("year")),
+ field("pe").as("performer")),
+ field("ge").as("genre")),
+ field("tr").as("tracks"))
+ ).byNamedSetters()
+ )
+ );
+```
+
+##### Insert query with bound statement load from storm configuration
+```java
+
+ new CassandraWriterBolt(
+ boundQuery(named("insertIntoAlbum"))
+ .bind(all());
+```
+
+##### Insert query with bound statement load from tuple field
+```java
+
+ new CassandraWriterBolt(
+ boundQuery(namedByField("cql"))
+ .bind(all());
+```
+
+##### Insert query with batch statement
+```java
+
+ // Logged
+ new CassandraWriterBolt(loggedBatch(
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+ )
+ );
+// UnLogged
+ new CassandraWriterBolt(unLoggedBatch(
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
+ )
+ );
+```
+
+### How to handle query execution results
+
+The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle.
+
+```java
+public interface ExecutionResultHandler extends Serializable {
+ void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+
+ void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+ void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+
+ void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+
+ void onQuerySuccess(OutputCollector collector, Tuple tuple);
+}
+```
+
+By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) .
+
+```java
+ new CassandraWriterBolt(insertInto("album").values(with(all()).build())
+ .withResultHandler(new MyCustomResultHandler());
+```
+
+### Declare Output fields
+
+A CassandraBolt can declare output fields / stream output fields.
+For instance, this may be used to remit a new tuple on error, or to chain queries.
+
+```java
+ new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
+ .withResultHandler(new EmitOnDriverExceptionResultHandler());
+ .withStreamOutputFields("stream_error", new Fields("message");
+
+ public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
+ @Override
+ protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+ LOG.error("An error occurred while executing cassandra statement", e);
+ collector.emit("stream_error", new Values(e.getMessage()));
+ collector.ack(tuple);
+ }
+ }
+```
+
+### Murmur3FieldGrouping
+
+[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) can be used to optimise cassandra writes.
+The stream is partitioned among the bolt's tasks based on the specified row partition keys.
+
+```java
+CassandraWriterBolt bolt = new CassandraWriterBolt(
+ insertInto("album")
+ .values(
+ with(fields("title", "year", "performer", "genre", "tracks")
+ ).build());
+builder.setBolt("BOLT_WRITER", bolt, 4)
+ .customGrouping("spout", new Murmur3StreamGrouping("title"))
+```
+
+### Trident API support
+storm-cassandra support Trident `state` API for `inserting` data into Cassandra.
+```java
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+ "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options);
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+ stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+ stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+```
+
+Below `state` API for `querying` data from Cassandra.
+```java
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+ .bind(with(field("weather_station_id").as("id")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+ CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
+ CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/storm-elasticsearch.md b/docs/storm-elasticsearch.md
new file mode 100644
index 0000000..fff0f51
--- /dev/null
+++ b/docs/storm-elasticsearch.md
@@ -0,0 +1,105 @@
+---
+title: Storm Elasticsearch Integration
+layout: documentation
+documentation: true
+---
+
+# Storm Elasticsearch Bolt & Trident State
+
+ EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch directly.
+ For detailed description, please refer to the following.
+
+## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
+
+EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination.
+Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and "id" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be indexed in Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
+```
+
+## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
+
+EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination.
+User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input tuple.
+"index" and "type" are used for identifying target index and type.
+"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
+```
+
+If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
+for each Percolate.Match in PercolateResponse.
+
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+
+StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+
+## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
+
+EsLookupBolt performs a get request to Elasticsearch.
+In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
+ ElasticsearchGetRequest is used to convert the incoming Tuple to the GetRequest that will be executed against Elasticsearch.
+ EsLookupResultOutput is used to declare the output fields and convert the GetResponse to values that are emited by the bolt.
+
+Incoming tuple is passed to provided GetRequest creator and the result of that execution is passed to Elasticsearch client.
+The bolt then uses the provider output adapter (EsLookupResultOutput) to convert the GetResponse to Values to emit.
+The output fields are also specified by the user of the bolt via the output adapter (EsLookupResultOutput).
+
+```java
+EsConfig esConfig = createEsConfig();
+ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
+EsLookupResultOutput output = createOutput();
+EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
+```
+
+## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
+
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+```
+
+or
+
+```java
+Map<String, String> additionalParameters = new HashMap<>();
+additionalParameters.put("client.transport.sniff", "true");
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
+```
+
+### EsConfig params
+
+|Arg |Description | Type
+|--- |--- |---
+|clusterName | Elasticsearch cluster name | String (required) |
+|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+|additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map<String, String> (optional) |
+
+## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
+
+For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define which fields are used for.
+Users need to define your own by implementing ```EsTupleMapper```.
+Storm-elasticsearch presents default mapper ```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which extracts its source, index, type, id values from identical fields.
+You can refer implementation of DefaultEsTupleMapper to see how to implement your own.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
+ * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
new file mode 100644
index 0000000..90994bd
--- /dev/null
+++ b/docs/storm-mongodb.md
@@ -0,0 +1,199 @@
+---
+title: Storm MongoDB Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+ Document toDocument(ITuple tuple);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ return document;
+ }
+
+ public SimpleMongoMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+### MongoTridentState
+We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper implements MongoMapper {
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ return new Document("$set", document);
+ }
+
+ public SimpleMongoUpdateMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+
+
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+ Bson createFilter(ITuple tuple);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+ private String field;
+
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.eq(field, tuple.getValueByField(field));
+ }
+
+ public SimpleQueryFilterCreator withField(String field) {
+ this.field = field;
+ return this;
+ }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+ MongoMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+ ```
+
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.gt("count", 3);
+ }
+ };
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+ ```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-mqtt.md
----------------------------------------------------------------------
diff --git a/docs/storm-mqtt.md b/docs/storm-mqtt.md
new file mode 100644
index 0000000..b730242
--- /dev/null
+++ b/docs/storm-mqtt.md
@@ -0,0 +1,379 @@
+---
+title: Storm MQTT Integration
+layout: documentation
+documentation: true
+---
+
+## About
+
+MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications.
+
+Further information can be found at http://mqtt.org. The HiveMQ website has a great series on
+[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
+
+Features include:
+
+* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
+* Spout implementation(s) for subscribing to MQTT topics
+* A bolt implementation for publishing MQTT messages
+* A trident function implementation for publishing MQTT messages
+* Authentication and TLS/SSL support
+* User-defined "mappers" for converting MQTT messages to tuples (subscribers)
+* User-defined "mappers" for converting tuples to MQTT messages (publishers)
+
+
+## Quick Start
+To quickly see MQTT integration in action, follow the instructions below.
+
+**Start a MQTT broker and publisher**
+
+The command below will create an MQTT broker on port 1883, and start a publsher that will publish random
+temperature/humidity values to an MQTT topic.
+
+Open a terminal and execute the following command (change the path as necessary):
+
+```bash
+java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher
+```
+
+**Run the example toplogy**
+
+Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout
+publishing to a bolt that simply logs the information it receives.
+
+In a separate terminal, run the following command (Note that the `storm` executable must be on your PATH):
+
+```bash
+storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
+```
+
+You should see data from MQTT being logged by the bolt:
+
+```
+27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
+27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
+27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
+27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
+27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
+27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
+```
+
+Either allow the local cluster to exit, or stop it by typing Cntrl-C.
+
+**MQTT Fault Tolerance In Action**
+
+After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker,
+and it will continue to receive and queue messages (as long as the broker is running).
+
+If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker,
+it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a
+rate of about two messages per second.
+
+This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that
+the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the
+`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*.
+
+For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below.
+
+
+
+## Delivery Guarantees
+In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as
+well as the MQTT spout.
+
+The MQTT protocol defines the following QoS levels:
+
+* `0` - At Most Once (AKA "Fire and Forget")
+* `1` - At Least Once
+* `2` - Exactly Once
+
+This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being
+completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects
+and embraces the concept of node failure.
+
+So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure.
+
+###Recommendations
+
+*You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide
+transational semantics. You will only get at least once guarantees.*
+
+If you need reliability guarantees (i.e. *at least once processing*):
+
+1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` so the broker saves messages if/when the
+spout is offline.
+2. Use the spout defaults (`cleanSession = false` and `qos = 1`)
+3. If you can, make sure any result of receiving and MQTT message is idempotent.
+4. Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and
+man-made disasters and network partitions. Incineration and destruction happens.
+
+
+
+
+
+## Configuration
+For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`.
+
+### Message Mappers
+To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the
+`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:
+
+```java
+public interface MqttMessageMapper extends Serializable {
+
+ Values toValues(MqttMessage message);
+
+ Fields outputFields();
+}
+```
+
+The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload
+(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both
+the message topic and payload:
+
+```java
+/**
+ * Given a topic name: "users/{user}/{location}/{deviceId}"
+ * and a payload of "{temperature}/{humidity}"
+ * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
+ *
+ */
+public class CustomMessageMapper implements MqttMessageMapper {
+ private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
+
+
+ public Values toValues(MqttMessage message) {
+ String topic = message.getTopic();
+ String[] topicElements = topic.split("/");
+ String[] payloadElements = new String(message.getMessage()).split("/");
+
+ return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]),
+ Float.parseFloat(payloadElements[1]));
+ }
+
+ public Fields outputFields() {
+ return new Fields("user", "deviceId", "location", "temperature", "humidity");
+ }
+}
+```
+
+### Tuple Mappers
+When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages
+(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface:
+
+```java
+public interface MqttTupleMapper extends Serializable{
+
+ MqttMessage toMessage(ITuple tuple);
+
+}
+```
+
+For example, a simple `MqttTupleMapper` implementation might look like this:
+
+```java
+public class MyTupleMapper implements MqttTupleMapper {
+ public MqttMessage toMessage(ITuple tuple) {
+ String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device");
+ byte[] payload = tuple.getStringByField("message").getBytes();
+ return new MqttMessage(topic, payload);
+ }
+}
+```
+
+### MQTT Spout Parallelism
+It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances
+of the spout subscribed to the same topic(s), resulting in duplicate messages.
+
+If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy
+and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined
+by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do
+something like the following:
+
+```java
+String spout1Topic = "users/east/#";
+String spout2Topic = "users/west/#";
+```
+
+and then join the resulting streams together by subscribing a bolt to each stream.
+
+
+### Using Flux
+
+The following Flux YAML configuration creates the toplolgy used in the example:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "tcp://localhost:1883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
+
+```
+
+
+### Using Java
+
+Similarly, you can create the same topology using the Storm Core Java API:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+MqttOptions options = new MqttOptions();
+options.setTopics(Arrays.asList("/users/tgoetz/#"));
+options.setCleanConnection(false);
+MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
+
+MqttBolt bolt = new LogInfoBolt();
+
+builder.setSpout("mqtt-spout", spout);
+builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
+
+return builder.createTopology();
+```
+
+## SSL/TLS
+If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout
+with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates.
+
+### SSL/TLS URIs
+To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over
+the algorithm, you can specify a specific protocol:
+
+ * `ssl://` Use the JVM default version of the SSL protocol.
+ * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`)
+ * `tls://` Use the JVM default version of the TLS protocol.
+ * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`)
+
+
+### Specifying Keystore/Truststore Locations
+
+ The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that
+ is used to load the certificates required for TLS/SSL connections. For example:
+
+```java
+ public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader)
+```
+
+The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the
+keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use
+`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks");
+ksl.setKeyStorePassword("password");
+ksl.setTrustStorePassword("password");
+//...
+```
+
+If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor:
+
+```java
+DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
+ksl.setKeyStorePassword("password");
+//...
+```
+
+SSL/TLS can also be configured using Flux:
+
+```yaml
+name: "mqtt-topology"
+
+components:
+ ########## MQTT Spout Config ############
+ - id: "mqtt-type"
+ className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
+
+ - id: "keystore-loader"
+ className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
+ constructorArgs:
+ - "keystore.jks"
+ - "truststore.jks"
+ properties:
+ - name: "keyPassword"
+ value: "password"
+ - name: "keyStorePassword"
+ value: "password"
+ - name: "trustStorePassword"
+ value: "password"
+
+ - id: "mqtt-options"
+ className: "org.apache.storm.mqtt.common.MqttOptions"
+ properties:
+ - name: "url"
+ value: "ssl://raspberrypi.local:8883"
+ - name: "topics"
+ value:
+ - "/users/tgoetz/#"
+
+# topology configuration
+config:
+ topology.workers: 1
+ topology.max.spout.pending: 1000
+
+# spout definitions
+spouts:
+ - id: "mqtt-spout"
+ className: "org.apache.storm.mqtt.spout.MqttSpout"
+ constructorArgs:
+ - ref: "mqtt-type"
+ - ref: "mqtt-options"
+ - ref: "keystore-loader"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+
+streams:
+
+ - from: "mqtt-spout"
+ to: "log"
+ grouping:
+ type: SHUFFLE
+
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file
[2/2] storm git commit: Merge branch 'STORM-1659-DOCS' of
https://github.com/vesense/storm into STORM-1659
Posted by ka...@apache.org.
Merge branch 'STORM-1659-DOCS' of https://github.com/vesense/storm into STORM-1659
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a48fae24
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a48fae24
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a48fae24
Branch: refs/heads/master
Commit: a48fae243d7bf4d374a6a3de8e9d3e46d9bd3b64
Parents: f118060 0a00c67
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Mar 27 20:24:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Mar 27 20:24:38 2016 +0900
----------------------------------------------------------------------
docs/Kestrel-and-Storm.md | 2 +-
docs/index.md | 14 +-
docs/storm-cassandra.md | 255 ++++++++++++++++++++++++++
docs/storm-elasticsearch.md | 105 +++++++++++
docs/storm-mongodb.md | 199 ++++++++++++++++++++
docs/storm-mqtt.md | 379 +++++++++++++++++++++++++++++++++++++++
6 files changed, 948 insertions(+), 6 deletions(-)
----------------------------------------------------------------------