You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/13 01:02:57 UTC

[1/2] storm git commit: [STORM-2464] update storm-mongodb.md

Repository: storm
Updated Branches:
  refs/heads/master d8368b0e5 -> abcd97209


[STORM-2464] update storm-mongodb.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5de29f50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5de29f50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5de29f50

Branch: refs/heads/master
Commit: 5de29f5057b75d72d770cd794ebc79097087c21a
Parents: 65c8b7b
Author: liuzhaokun <li...@zte.com.cn>
Authored: Tue Apr 11 19:30:01 2017 +0800
Committer: \u5218\u5146\u576410206665 <li...@zte.com.cn>
Committed: Tue Apr 11 19:30:01 2017 +0800

----------------------------------------------------------------------
 docs/storm-mongodb.md | 181 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 154 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5de29f50/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
index 133bac3..0457dd0 100644
--- a/docs/storm-mongodb.md
+++ b/docs/storm-mongodb.md
@@ -15,6 +15,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
 ```java
 public interface MongoMapper extends Serializable {
     Document toDocument(ITuple tuple);
+    Document toDocumentByKeys(List<Object> keys);
 }
 ```
 
@@ -34,6 +35,13 @@ public class SimpleMongoMapper implements MongoMapper {
         return document;
     }
 
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
     public SimpleMongoMapper withFields(String... fields) {
         this.fields = fields;
         return this;
@@ -57,38 +65,25 @@ MongoMapper mapper = new SimpleMongoMapper()
 MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
  ```
 
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
 
 ## Update from Database
 The bolt included in this package for updating data from a database collection.
 
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
 ### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
 `SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
 https://docs.mongodb.org/manual/reference/operator/update/
 
 ```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
     private String[] fields;
 
     @Override
@@ -97,6 +92,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
         for(String field : fields){
             document.append(field, tuple.getValueByField(field));
         }
+        //$set operator: Sets the value of a field in a document.
         return new Document("$set", document);
     }
 
@@ -108,13 +104,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
 ```
 
 
- 
 ### QueryFilterCreator
 The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
 
  ```java
 public interface QueryFilterCreator extends Serializable {
     Bson createFilter(ITuple tuple);
+    Bson createFilterByKeys(List<Object> keys);
 }
  ```
 
@@ -124,6 +120,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
 
  ```java
 public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
     private String field;
     
     @Override
@@ -131,6 +128,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
         return Filters.eq(field, tuple.getValueByField(field));
     }
 
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
     public SimpleQueryFilterCreator withField(String field) {
         this.field = field;
         return this;
@@ -140,10 +142,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
  ```
 
 ### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
 
  ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -153,12 +155,15 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
 
         //if a new document should be inserted if there are no matches to the query filter
         //updateBolt.withUpsert(true);
+
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
  ```
  
  Or use a anonymous inner class implementation for `QueryFilterCreator`:
  
   ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -174,3 +179,125 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
         //updateBolt.withUpsert(true);
  ```
 
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+    List<Values> toTuple(ITuple input, Document doc);
+
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
\ No newline at end of file


[2/2] storm git commit: Merge branch 'master_mongodb_doc' of https://github.com/liu-zhaokun/storm

Posted by xi...@apache.org.
Merge branch 'master_mongodb_doc' of https://github.com/liu-zhaokun/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/abcd9720
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/abcd9720
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/abcd9720

Branch: refs/heads/master
Commit: abcd9720991115a9bde9b72b41180d3022641d0d
Parents: d8368b0 5de29f5
Author: vesense <be...@163.com>
Authored: Thu Apr 13 09:02:15 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 13 09:02:15 2017 +0800

----------------------------------------------------------------------
 docs/storm-mongodb.md | 181 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 154 insertions(+), 27 deletions(-)
----------------------------------------------------------------------