You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/17 01:52:08 UTC

[1/3] storm git commit: STORM-1483: add storm-mongodb connector

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 53e1ab0c6 -> eeeb7b9c3


STORM-1483: add storm-mongodb connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45fe4595
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45fe4595
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45fe4595

Branch: refs/heads/1.x-branch
Commit: 45fe4595c64bb7772cafd3d4bd9923b2429e835b
Parents: f0abfff
Author: Xin Wang <be...@163.com>
Authored: Sun Jan 24 22:05:23 2016 +0800
Committer: vesense <be...@163.com>
Committed: Wed Mar 16 14:51:13 2016 +0800

----------------------------------------------------------------------
 external/storm-mongodb/README.md                | 195 +++++++++++++++++++
 external/storm-mongodb/pom.xml                  |  74 +++++++
 .../storm/mongodb/bolt/AbstractMongoBolt.java   |  56 ++++++
 .../storm/mongodb/bolt/MongoInsertBolt.java     |  62 ++++++
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  75 +++++++
 .../storm/mongodb/common/MongoDBClient.java     |  91 +++++++++
 .../mongodb/common/QueryFilterCreator.java      |  38 ++++
 .../common/SimpleQueryFilterCreator.java        |  39 ++++
 .../mongodb/common/mapper/MongoMapper.java      |  38 ++++
 .../common/mapper/SimpleMongoMapper.java        |  40 ++++
 .../common/mapper/SimpleMongoUpdateMapper.java  |  41 ++++
 .../storm/mongodb/trident/state/MongoState.java |  97 +++++++++
 .../trident/state/MongoStateFactory.java        |  42 ++++
 .../trident/state/MongoStateUpdater.java        |  34 ++++
 .../storm/mongodb/topology/InsertWordCount.java |  81 ++++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  91 +++++++++
 .../storm/mongodb/topology/WordCounter.java     |  67 +++++++
 .../storm/mongodb/topology/WordSpout.java       |  88 +++++++++
 .../storm/mongodb/trident/WordCountTrident.java |  85 ++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 21 files changed, 1349 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
new file mode 100644
index 0000000..614b52f
--- /dev/null
+++ b/external/storm-mongodb/README.md
@@ -0,0 +1,195 @@
+#Storm MongoDB
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+    Document toDocument(ITuple tuple);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+        .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+### MongoTridentState
+We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+
+ 
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+    Bson createFilter(ITuple tuple);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+ 
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+ 
+  ```java
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+            @Override
+            public Bson createFilter(ITuple tuple) {
+                return Filters.gt("count", 3);
+            }
+        };
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
new file mode 100644
index 0000000..7653ac8
--- /dev/null
+++ b/external/storm-mongodb/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-mongodb</artifactId>
+
+    <developers>
+        <developer>
+            <id>vesense</id>
+            <name>Xin Wang</name>
+            <email>data.xinwang@gmail.com</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <mongodb.version>3.2.0</mongodb.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>${mongodb.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+        <!--test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
new file mode 100644
index 0000000..f730ec7
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.bolt;
+
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
+
+public abstract class AbstractMongoBolt extends BaseRichBolt {
+
+    private String url;
+    private String collectionName;
+
+    protected OutputCollector collector;
+    protected MongoDBClient mongoClient;
+
+    public AbstractMongoBolt(String url, String collectionName) {
+       Validate.notEmpty(url, "url can not be blank or null");
+       Validate.notEmpty(collectionName, "collectionName can not be blank or null");
+
+       this.url = url;
+       this.collectionName = collectionName;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context,
+            OutputCollector collector) {
+        this.collector = collector;
+        this.mongoClient = new MongoDBClient(url, collectionName);
+    }
+
+    @Override
+    public void cleanup() {
+       this.mongoClient.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
new file mode 100644
index 0000000..26cd150
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.bson.Document;
+
+/**
+ * Basic bolt for writing to MongoDB.
+ *
+ * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
+ *
+ */
+public class MongoInsertBolt extends AbstractMongoBolt {
+
+    private MongoMapper mapper;
+
+    public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(mapper, "MongoMapper can not be null");
+
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try{
+            //get document
+            Document doc = mapper.toDocument(tuple);
+            mongoClient.insert(doc);
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
new file mode 100644
index 0000000..1994993
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Basic bolt for updating from MongoDB.
+ *
+ * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
+ *
+ */
+public class MongoUpdateBolt extends AbstractMongoBolt {
+
+    private QueryFilterCreator queryCreator;
+    private MongoMapper mapper;
+
+    private boolean upsert;  //The default is false.
+
+    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+        Validate.notNull(mapper, "MongoMapper can not be null");
+
+        this.queryCreator = queryCreator;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try{
+            //get document
+            Document doc = mapper.toDocument(tuple);
+            //get query filter
+            Bson filter = queryCreator.createFilter(tuple);
+            mongoClient.update(filter, doc, upsert);
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
+
+    public void withUpsert(boolean upsert) {
+        this.upsert = upsert;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
new file mode 100644
index 0000000..be2e376
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.UpdateOptions;
+
+public class MongoDBClient {
+
+    private MongoClient client;
+    private MongoCollection<Document> collection;
+
+    public MongoDBClient(String url, String collectionName) {
+        //Creates a MongoURI from the given string.
+        MongoClientURI uri = new MongoClientURI(url);
+        //Creates a MongoClient described by a URI.
+        this.client = new MongoClient(uri);
+        //Gets a Database.
+        MongoDatabase db = client.getDatabase(uri.getDatabase());
+        //Gets a collection.
+        this.collection = db.getCollection(collectionName);
+    }
+
+    /**
+     * Inserts the provided document.
+     * 
+     * @param document
+     */
+    public void insert(Document document) {
+        collection.insertOne(document);
+    }
+
+    /**
+     * Inserts one or more documents.
+     * This method is equivalent to a call to the bulkWrite method.
+     * The documents will be inserted in the order provided, 
+     * stopping on the first failed insertion. 
+     * 
+     * @param documents
+     */
+    public void insert(List<Document> documents) {
+        collection.insertMany(documents);
+    }
+
+    /**
+     * Update all documents in the collection according to the specified query filter.
+     * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
+     * 
+     * @param filter
+     * @param update
+     * @param upsert
+     */
+    public void update(Bson filter, Bson update, boolean upsert) {
+        UpdateOptions options = new UpdateOptions();
+        if(upsert) {
+            options.upsert(true);
+        }
+        collection.updateMany(filter, update, options);
+    }
+
+    /**
+     * Closes all resources associated with this instance.
+     */
+    public void close(){
+        client.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
new file mode 100644
index 0000000..d95f717
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common;
+
+import java.io.Serializable;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+/**
+ * Create a MongoDB query Filter by given Tuple.
+ */
+public interface QueryFilterCreator extends Serializable {
+
+    /**
+     * Create a query Filter by given Tuple
+     * 
+     * @param tuple
+     * @return query Filter
+     */
+    Bson createFilter(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
new file mode 100644
index 0000000..8b4f1c3
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+import com.mongodb.client.model.Filters;
+
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
new file mode 100644
index 0000000..7bcd499
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import java.io.Serializable;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+/**
+ * Given a Tuple, converts it to an MongoDB document.
+ */
+public interface MongoMapper extends Serializable {
+
+    /**
+     * Converts a Tuple to a Document
+     * 
+     * @param tuple the incoming tuple
+     * @return the MongoDB document
+     */
+    Document toDocument(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
new file mode 100644
index 0000000..4440962
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoMapper implements MongoMapper {
+
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
new file mode 100644
index 0000000..f07d4dc
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoUpdateMapper implements MongoMapper {
+
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        //$set operator: Sets the value of a field in a document.
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
new file mode 100644
index 0000000..843fcee
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MongoState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
+
+    private Options options;
+    private MongoDBClient mongoClient;
+    private Map map;
+
+    protected MongoState(Map map, Options options) {
+        this.options = options;
+        this.map = map;
+    }
+
+    public static class Options implements Serializable {
+        private String url;
+        private String collectionName;
+        private MongoMapper mapper;
+
+        public Options withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public Options withCollectionName(String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        public Options withMapper(MongoMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Validate.notEmpty(options.url, "url can not be blank or null");
+        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+        Validate.notNull(options.mapper, "MongoMapper can not be null");
+
+        this.mongoClient = new MongoDBClient(options.url, options.collectionName);
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop.");
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        List<Document> documents = Lists.newArrayList();
+        for (TridentTuple tuple : tuples) {
+            Document document = options.mapper.toDocument(tuple);
+            documents.add(document);
+        }
+        this.mongoClient.insert(documents);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
new file mode 100644
index 0000000..d6cd3a5
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class MongoStateFactory implements StateFactory {
+
+    private MongoState.Options options;
+
+    public MongoStateFactory(MongoState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics,
+            int partitionIndex, int numPartitions) {
+        MongoState state = new MongoState(conf, options);
+        state.prepare();
+        return state;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
new file mode 100644
index 0000000..3173f6c
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MongoStateUpdater extends BaseStateUpdater<MongoState>  {
+
+    @Override
+    public void updateState(MongoState state, List<TridentTuple> tuples,
+            TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
new file mode 100644
index 0000000..c83bdbd
--- /dev/null
+++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.mongodb.bolt.MongoInsertBolt;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InsertWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String INSERT_BOLT = "INSERT_BOLT";
+
+    private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+    
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String url = TEST_MONGODB_URL;
+        String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+        if (args.length >= 2) {
+            url = args[0];
+            collectionName = args[1];
+        }
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+        
+        MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+
+        // wordSpout ==> countBolt ==> MongoInsertBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
new file mode 100644
index 0000000..071708e
--- /dev/null
+++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.mongodb.bolt.MongoInsertBolt;
+import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class UpdateWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String UPDATE_BOLT = "UPDATE_BOLT";
+
+    private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+    
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String url = TEST_MONGODB_URL;
+        String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+        if (args.length >= 2) {
+            url = args[0];
+            collectionName = args[1];
+        }
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        MongoMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+
+        // wordSpout ==> countBolt ==> MongoUpdateBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
new file mode 100644
index 0000000..481f959
--- /dev/null
+++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class WordCounter implements IBasicBolt {
+    private Map<String, Integer> wordCounter = Maps.newHashMap();
+
+    public void prepare(Map stormConf, TopologyContext context) {
+        
+    }
+
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        String word = input.getStringByField("word");
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
+        }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, String.valueOf(count)));
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
new file mode 100644
index 0000000..284f228
--- /dev/null
+++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class WordSpout implements IRichSpout {
+    boolean isDistributed;
+    SpoutOutputCollector collector;
+    public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
+
+    public WordSpout() {
+        this(true);
+    }
+
+    public WordSpout(boolean isDistributed) {
+        this.isDistributed = isDistributed;
+    }
+
+    public boolean isDistributed() {
+        return this.isDistributed;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public void close() {
+
+    }
+
+    public void nextTuple() {
+        final Random rand = new Random();
+        final String word = words[rand.nextInt(words.length)];
+        this.collector.emit(new Values(word), UUID.randomUUID());
+        Thread.yield();
+    }
+
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
new file mode 100644
index 0000000..7a18863
--- /dev/null
+++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTrident {
+
+    public static StormTopology buildTopology(String url, String collectionName){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("wordCounter");
+            cluster.shutdown();
+            System.exit(0);
+        }
+        else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+        } else{
+            System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index facd824..213e5ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -277,6 +277,7 @@
         <module>external/storm-metrics</module>
         <module>external/storm-cassandra</module>
         <module>external/storm-mqtt</module>
+        <module>external/storm-mongodb</module>
         <module>examples/storm-starter</module>
     </modules>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 6d40c19..9332283 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -303,6 +303,20 @@
                 <include>storm*jar</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-mongodb/target</directory>
+            <outputDirectory>external/storm-mongodb</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-mongodb</directory>
+            <outputDirectory>external/storm-mongodb</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <!-- $STORM_HOME/extlib -->
         <fileSet>


[2/3] storm git commit: Merge branch 'STORM-1483-1.x' of https://github.com/vesense/storm into STORM-1483

Posted by sr...@apache.org.
Merge branch 'STORM-1483-1.x' of https://github.com/vesense/storm into STORM-1483


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25657d89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25657d89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25657d89

Branch: refs/heads/1.x-branch
Commit: 25657d899ddfe1b01be959d32fae7806da4ac410
Parents: 53e1ab0 45fe459
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Mar 16 16:38:45 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Mar 16 16:38:45 2016 -0700

----------------------------------------------------------------------
 external/storm-mongodb/README.md                | 195 +++++++++++++++++++
 external/storm-mongodb/pom.xml                  |  74 +++++++
 .../storm/mongodb/bolt/AbstractMongoBolt.java   |  56 ++++++
 .../storm/mongodb/bolt/MongoInsertBolt.java     |  62 ++++++
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  75 +++++++
 .../storm/mongodb/common/MongoDBClient.java     |  91 +++++++++
 .../mongodb/common/QueryFilterCreator.java      |  38 ++++
 .../common/SimpleQueryFilterCreator.java        |  39 ++++
 .../mongodb/common/mapper/MongoMapper.java      |  38 ++++
 .../common/mapper/SimpleMongoMapper.java        |  40 ++++
 .../common/mapper/SimpleMongoUpdateMapper.java  |  41 ++++
 .../storm/mongodb/trident/state/MongoState.java |  97 +++++++++
 .../trident/state/MongoStateFactory.java        |  42 ++++
 .../trident/state/MongoStateUpdater.java        |  34 ++++
 .../storm/mongodb/topology/InsertWordCount.java |  81 ++++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  91 +++++++++
 .../storm/mongodb/topology/WordCounter.java     |  67 +++++++
 .../storm/mongodb/topology/WordSpout.java       |  88 +++++++++
 .../storm/mongodb/trident/WordCountTrident.java |  85 ++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 21 files changed, 1349 insertions(+)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1483 to CHANGLELOG.

Posted by sr...@apache.org.
Added STORM-1483 to CHANGLELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eeeb7b9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eeeb7b9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eeeb7b9c

Branch: refs/heads/1.x-branch
Commit: eeeb7b9c39167d3065f198aad8e91dfbb1b631f4
Parents: 25657d8
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Mar 16 17:51:14 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Mar 16 17:51:14 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eeeb7b9c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5562ee1..f765315 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1483: add storm-mongodb connector
  * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
  * STORM-971: Metric for messages lost due to kafka retention
  * STORM-1608: Fix stateful topology acking behavior