You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2016/12/16 03:32:46 UTC

[3/4] storm git commit: update examples and documents

update examples and documents


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a833af41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a833af41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a833af41

Branch: refs/heads/master
Commit: a833af4195987ded716a5aa47b476271f47e4c85
Parents: 2993920
Author: vesense <be...@163.com>
Authored: Thu Oct 27 15:53:16 2016 +0800
Committer: vesense <be...@163.com>
Committed: Thu Oct 27 15:53:16 2016 +0800

----------------------------------------------------------------------
 .../storm/mongodb/topology/LookupWordCount.java |  80 ++++++++
 .../mongodb/topology/TotalWordCounter.java      |  69 +++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  16 +-
 .../storm/mongodb/trident/PrintFunction.java    |  40 ++++
 .../storm/mongodb/trident/WordCountTrident.java |   9 +-
 .../mongodb/trident/WordCountTridentMap.java    |  95 ++++++++++
 external/storm-mongodb/README.md                | 185 ++++++++++++++++---
 7 files changed, 456 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
new file mode 100644
index 0000000..5140685
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.bolt.MongoLookupBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+public class LookupWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+    private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String url = TEST_MONGODB_URL;
+        String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+        if (args.length >= 2) {
+            url = args[0];
+            collectionName = args[1];
+        }
+
+        WordSpout spout = new WordSpout();
+        TotalWordCounter totalBolt = new TotalWordCounter();
+
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+
+        //wordspout -> lookupbolt -> totalCountBolt
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
new file mode 100644
index 0000000..137aac4
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+    private BigInteger total = BigInteger.ZERO;
+    private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+    private static final Random RANDOM = new Random();
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    /*
+     * Just output the word value with a count of 1.
+     */
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        total = total.add(new BigInteger(input.getValues().get(1).toString()));
+        collector.emit(tuple(total.toString()));
+        //prints the total with low probability.
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info("Running total = " + total);
+        }
+    }
+
+    public void cleanup() {
+        LOG.info("Final total = " + total);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("total"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index 7895f35..daca619 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -20,19 +20,14 @@ package org.apache.storm.mongodb.topology;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
 import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
 import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class UpdateWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String COUNT_BOLT = "COUNT_BOLT";
@@ -56,17 +51,20 @@ public class UpdateWordCount {
         WordSpout spout = new WordSpout();
         WordCounter bolt = new WordCounter();
 
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
-        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
                 .withField("word");
         
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, filterCreator, mapper);
 
         //if a new document should be inserted if there are no matches to the query filter
         //updateBolt.withUpsert(true);
 
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
+
         // wordSpout ==> countBolt ==> MongoUpdateBolt
         TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
new file mode 100644
index 0000000..3db91f9
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+    private static final Random RANDOM = new Random();
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info(tuple.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index 44447be..fe1bb03 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -25,6 +25,7 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
 import org.apache.storm.mongodb.trident.state.MongoState;
 import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateQuery;
 import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
@@ -59,7 +60,13 @@ public class WordCountTrident {
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
-        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
         return topology.build();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
new file mode 100644
index 0000000..83c0caf
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTridentMap {
+
+    public static StormTopology buildTopology(String url, String collectionName){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("wordCounter");
+            cluster.shutdown();
+            System.exit(0);
+        }
+        else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+        } else{
+            System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index 614b52f..00b8e4b 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,6 +1,6 @@
 #Storm MongoDB
 
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
 
 ## Insert into Database
 The bolt and trident state included in this package for inserting data into a database collection.
@@ -11,6 +11,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
 ```java
 public interface MongoMapper extends Serializable {
     Document toDocument(ITuple tuple);
+    Document toDocumentByKeys(List<Object> keys);
 }
 ```
 
@@ -30,6 +31,13 @@ public class SimpleMongoMapper implements MongoMapper {
         return document;
     }
 
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
     public SimpleMongoMapper withFields(String... fields) {
         this.fields = fields;
         return this;
@@ -53,38 +61,25 @@ MongoMapper mapper = new SimpleMongoMapper()
 MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
  ```
 
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
 
 ## Update from Database
 The bolt included in this package for updating data from a database collection.
 
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
 ### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
 `SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
 https://docs.mongodb.org/manual/reference/operator/update/
 
 ```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
     private String[] fields;
 
     @Override
@@ -93,6 +88,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
         for(String field : fields){
             document.append(field, tuple.getValueByField(field));
         }
+        //$set operator: Sets the value of a field in a document.
         return new Document("$set", document);
     }
 
@@ -104,13 +100,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
 ```
 
 
- 
 ### QueryFilterCreator
 The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
 
  ```java
 public interface QueryFilterCreator extends Serializable {
     Bson createFilter(ITuple tuple);
+    Bson createFilterByKeys(List<Object> keys);
 }
  ```
 
@@ -120,6 +116,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
 
  ```java
 public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
     private String field;
     
     @Override
@@ -127,6 +124,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
         return Filters.eq(field, tuple.getValueByField(field));
     }
 
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
     public SimpleQueryFilterCreator withField(String field) {
         this.field = field;
         return this;
@@ -136,10 +138,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
  ```
 
 ### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
 
  ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -149,12 +151,15 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
 
         //if a new document should be inserted if there are no matches to the query filter
         //updateBolt.withUpsert(true);
+
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
  ```
  
  Or use a anonymous inner class implementation for `QueryFilterCreator`:
  
   ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -170,6 +175,130 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
         //updateBolt.withUpsert(true);
  ```
 
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+    List<Values> toTuple(ITuple input, Document doc);
+
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+ 
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one