You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/13 01:02:57 UTC
[1/2] storm git commit: [STORM-2464] update storm-mongodb.md
Repository: storm
Updated Branches:
refs/heads/master d8368b0e5 -> abcd97209
[STORM-2464] update storm-mongodb.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5de29f50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5de29f50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5de29f50
Branch: refs/heads/master
Commit: 5de29f5057b75d72d770cd794ebc79097087c21a
Parents: 65c8b7b
Author: liuzhaokun <li...@zte.com.cn>
Authored: Tue Apr 11 19:30:01 2017 +0800
Committer: \u5218\u5146\u576410206665 <li...@zte.com.cn>
Committed: Tue Apr 11 19:30:01 2017 +0800
----------------------------------------------------------------------
docs/storm-mongodb.md | 181 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 154 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5de29f50/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
index 133bac3..0457dd0 100644
--- a/docs/storm-mongodb.md
+++ b/docs/storm-mongodb.md
@@ -15,6 +15,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
```java
public interface MongoMapper extends Serializable {
Document toDocument(ITuple tuple);
+ Document toDocumentByKeys(List<Object> keys);
}
```
@@ -34,6 +35,13 @@ public class SimpleMongoMapper implements MongoMapper {
return document;
}
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
@@ -57,38 +65,25 @@ MongoMapper mapper = new SimpleMongoMapper()
MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
```
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
## Update from Database
The bolt included in this package for updating data from a database collection.
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
https://docs.mongodb.org/manual/reference/operator/update/
```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
private String[] fields;
@Override
@@ -97,6 +92,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
+ //$set operator: Sets the value of a field in a document.
return new Document("$set", document);
}
@@ -108,13 +104,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
```
-
### QueryFilterCreator
The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
```java
public interface QueryFilterCreator extends Serializable {
Bson createFilter(ITuple tuple);
+ Bson createFilterByKeys(List<Object> keys);
}
```
@@ -124,6 +120,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
```java
public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
private String field;
@Override
@@ -131,6 +128,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
return Filters.eq(field, tuple.getValueByField(field));
}
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
@@ -140,10 +142,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
```
### MongoUpdateBolt
-To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -153,12 +155,15 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
+
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
```
Or use a anonymous inner class implementation for `QueryFilterCreator`:
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -174,3 +179,125 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//updateBolt.withUpsert(true);
```
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+ List<Values> toTuple(ITuple input, Document doc);
+
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
\ No newline at end of file
[2/2] storm git commit: Merge branch 'master_mongodb_doc' of
https://github.com/liu-zhaokun/storm
Posted by xi...@apache.org.
Merge branch 'master_mongodb_doc' of https://github.com/liu-zhaokun/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/abcd9720
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/abcd9720
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/abcd9720
Branch: refs/heads/master
Commit: abcd9720991115a9bde9b72b41180d3022641d0d
Parents: d8368b0 5de29f5
Author: vesense <be...@163.com>
Authored: Thu Apr 13 09:02:15 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 13 09:02:15 2017 +0800
----------------------------------------------------------------------
docs/storm-mongodb.md | 181 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 154 insertions(+), 27 deletions(-)
----------------------------------------------------------------------