You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/23 17:10:05 UTC

[2/5] storm git commit: set '* text=auto' in .gitattributes in order to avoid merge work because of line feed changes

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index b5b9e96..5c81710 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,325 +1,325 @@
-#Storm MongoDB
-
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
-
-## Insert into Database
-The bolt and trident state included in this package for inserting data into a database collection.
-
-### MongoMapper
-The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
-
-```java
-public interface MongoMapper extends Serializable {
-    Document toDocument(ITuple tuple);
-    Document toDocumentByKeys(List<Object> keys);
-}
-```
-
-### SimpleMongoMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-
-```java
-public class SimpleMongoMapper implements MongoMapper {
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getID(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-### MongoInsertBolt
-To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
- `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
-
-More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
-
- ```java
-String url = "mongodb://127.0.0.1:27017/test";
-String collectionName = "wordcount";
-
-MongoMapper mapper = new SimpleMongoMapper()
-        .withFields("word", "count");
-
-MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
- ```
-
-
-## Update from Database
-The bolt included in this package for updating data from a database collection.
-
-### MongoUpdateMapper
-The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
-
-```java
-public interface MongoUpdateMapper extends MongoMapper { }
-```
-
-### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
-https://docs.mongodb.org/manual/reference/operator/update/
-
-```java
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-
-### QueryFilterCreator
-The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
-
- ```java
-public interface QueryFilterCreator extends Serializable {
-    Bson createFilter(ITuple tuple);
-    Bson createFilterByKeys(List<Object> keys);
-}
- ```
-
-### SimpleQueryFilterCreator
-`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
-https://docs.mongodb.org/manual/reference/operator/query/
-
- ```java
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getID(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
- ```
-
-### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
-
- ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the query filter
-        //updateBolt.withUpsert(true);
-
-        //whether find all documents according to the query filter
-        //updateBolt.withMany(true);
- ```
- 
- Or use a anonymous inner class implementation for `QueryFilterCreator`:
- 
-  ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
-            @Override
-            public Bson createFilter(ITuple tuple) {
-                return Filters.gt("count", 3);
-            }
-        };
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the query filter
-        //updateBolt.withUpsert(true);
- ```
-
-
-## Lookup from Database
-The bolt included in this package for selecting data from a database collection.
-
-### MongoLookupMapper
-The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
-
-```java
-public interface MongoLookupMapper extends Serializable {
-
-    List<Values> toTuple(ITuple input, Document doc);
-
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-### SimpleMongoLookupMapper
-`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
-
-```java
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
-    private String[] fields;
-
-    @Override
-    public List<Values> toTuple(ITuple input, Document doc) {
-        Values values = new Values();
-
-        for(String field : fields) {
-            if(input.contains(field)) {
-                values.add(input.getValueByField(field));
-            } else {
-                values.add(doc.get(field));
-            }
-        }
-        List<Values> result = new ArrayList<Values>();
-        result.add(values);
-        return result;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fields));
-    }
-
-    public SimpleMongoLookupMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-
-}
-```
-
-### MongoLookupBolt
-To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
-
- ```java
-        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
- ```
-
-## Mongo Trident State&MapState
-### Trident State
-We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new MongoStateUpdater(), new Fields());
-
-        TridentState state = topology.newStaticState(factory);
-        stream = stream.stateQuery(state, new Fields("word"),
-                new MongoStateQuery(), new Fields("columnName", "columnValue"));
-        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
-
-### Trident MapState
-We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoMapState.Options options = new MongoMapState.Options();
-        options.url = url;
-        options.collectionName = collectionName;
-        options.mapper = mapper;
-        options.queryCreator = filterCreator;
-
-        StateFactory factory = MongoMapState.transactional(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        TridentState state = stream.groupBy(new Fields("word"))
-                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
-
-        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
-                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
- ```
- 
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
- 
+#Storm MongoDB
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+    Document toDocument(ITuple tuple);
+    Document toDocumentByKeys(List<Object> keys);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document.  `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+        .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+    private String[] fields;
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for(String field : fields){
+            document.append(field, tuple.getValueByField(field));
+        }
+        //$set operator: Sets the value of a field in a document.
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}
+```
+
+
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+    Bson createFilter(ITuple tuple);
+    Bson createFilterByKeys(List<Object> keys);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit 
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
+ ```
+ 
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+ 
+  ```java
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+            @Override
+            public Bson createFilter(ITuple tuple) {
+                return Filters.gt("count", 3);
+            }
+        };
+        
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+        //if a new document should be inserted if there are no matches to the query filter
+        //updateBolt.withUpsert(true);
+ ```
+
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+    List<Values> toTuple(ITuple input, Document doc);
+
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+ 
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
index ee66811..13049de 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -1,62 +1,62 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
-
-public abstract class AbstractMongoBolt extends BaseRichBolt {
-
-    private String url;
-    private String collectionName;
-
-    protected OutputCollector collector;
-    protected MongoDbClient mongoClient;
-
-    /**
-     * AbstractMongoBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     */
-    public AbstractMongoBolt(String url, String collectionName) {
-        Validate.notEmpty(url, "url can not be blank or null");
-        Validate.notEmpty(collectionName, "collectionName can not be blank or null");
-
-        this.url = url;
-        this.collectionName = collectionName;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        this.mongoClient = new MongoDbClient(url, collectionName);
-    }
-
-    @Override
-    public void cleanup() {
-        this.mongoClient.close();
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
+
+public abstract class AbstractMongoBolt extends BaseRichBolt {
+
+    private String url;
+    private String collectionName;
+
+    protected OutputCollector collector;
+    protected MongoDbClient mongoClient;
+
+    /**
+     * AbstractMongoBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     */
+    public AbstractMongoBolt(String url, String collectionName) {
+        Validate.notEmpty(url, "url can not be blank or null");
+        Validate.notEmpty(collectionName, "collectionName can not be blank or null");
+
+        this.url = url;
+        this.collectionName = collectionName;
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
+            OutputCollector collector) {
+        this.collector = collector;
+        this.mongoClient = new MongoDbClient(url, collectionName);
+    }
+
+    @Override
+    public void cleanup() {
+        this.mongoClient.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index afd1142..5d233da 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -1,124 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-
-/**
- * Basic bolt for writing to MongoDB.
- * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
- */
-public class MongoInsertBolt extends AbstractMongoBolt {
-
-    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
-
-    private MongoMapper mapper;
-
-    private boolean ordered = true;  //default is ordered.
-
-    private int batchSize;
-
-    private BatchHelper batchHelper;
-
-    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-
-    /**
-     * MongoInsertBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(mapper, "MongoMapper can not be null");
-
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        try {
-            if (batchHelper.shouldHandle(tuple)) {
-                batchHelper.addBatch(tuple);
-            }
-
-            if (batchHelper.shouldFlush()) {
-                flushTuples();
-                batchHelper.ack();
-            }
-        } catch (Exception e) {
-            batchHelper.fail(e);
-        }
-    }
-
-    private void flushTuples() {
-        List<Document> docs = new LinkedList<>();
-        for (Tuple t : batchHelper.getBatchTuples()) {
-            Document doc = mapper.toDocument(t);
-            docs.add(doc);
-        }
-        mongoClient.insert(docs, ordered);
-    }
-
-    public MongoInsertBolt withBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public MongoInsertBolt withOrdered(boolean ordered) {
-        this.ordered = ordered;
-        return this;
-    }
-
-    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
-        this.flushIntervalSecs = flushIntervalSecs;
-        return this;
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        super.prepare(topoConf, context, collector);
-        this.batchHelper = new BatchHelper(batchSize, collector);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.BatchHelper;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+
+/**
+ * Basic bolt for writing to MongoDB.
+ * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoInsertBolt extends AbstractMongoBolt {
+
+    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
+
+    private MongoMapper mapper;
+
+    private boolean ordered = true;  //default is ordered.
+
+    private int batchSize;
+
+    private BatchHelper batchHelper;
+
+    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
+    /**
+     * MongoInsertBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     * @param mapper MongoMapper converting tuple to an MongoDB document
+     */
+    public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(mapper, "MongoMapper can not be null");
+
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            if (batchHelper.shouldHandle(tuple)) {
+                batchHelper.addBatch(tuple);
+            }
+
+            if (batchHelper.shouldFlush()) {
+                flushTuples();
+                batchHelper.ack();
+            }
+        } catch (Exception e) {
+            batchHelper.fail(e);
+        }
+    }
+
+    private void flushTuples() {
+        List<Document> docs = new LinkedList<>();
+        for (Tuple t : batchHelper.getBatchTuples()) {
+            Document doc = mapper.toDocument(t);
+            docs.add(doc);
+        }
+        mongoClient.insert(docs, ordered);
+    }
+
+    public MongoInsertBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public MongoInsertBolt withOrdered(boolean ordered) {
+        this.ordered = ordered;
+        return this;
+    }
+
+    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+        this.flushIntervalSecs = flushIntervalSecs;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
+            OutputCollector collector) {
+        super.prepare(topoConf, context, collector);
+        this.batchHelper = new BatchHelper(batchSize, collector);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 5579b8e..4f92b32 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -1,93 +1,93 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for updating from MongoDB.
- * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
- */
-public class MongoUpdateBolt extends AbstractMongoBolt {
-
-    private QueryFilterCreator queryCreator;
-    private MongoUpdateMapper mapper;
-
-    private boolean upsert;  //the default is false.
-    private boolean many;  //the default is false.
-
-    /**
-     * MongoUpdateBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param queryCreator QueryFilterCreator
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
-        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
-
-        this.queryCreator = queryCreator;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
-            return;
-        }
-
-        try {
-            //get document
-            Document doc = mapper.toDocument(tuple);
-            //get query filter
-            Bson filter = queryCreator.createFilter(tuple);
-            mongoClient.update(filter, doc, upsert, many);
-            this.collector.ack(tuple);
-        } catch (Exception e) {
-            this.collector.reportError(e);
-            this.collector.fail(tuple);
-        }
-    }
-
-    public MongoUpdateBolt withUpsert(boolean upsert) {
-        this.upsert = upsert;
-        return this;
-    }
-
-    public MongoUpdateBolt withMany(boolean many) {
-        this.many = many;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Basic bolt for updating from MongoDB.
+ * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoUpdateBolt extends AbstractMongoBolt {
+
+    private QueryFilterCreator queryCreator;
+    private MongoUpdateMapper mapper;
+
+    private boolean upsert;  //the default is false.
+    private boolean many;  //the default is false.
+
+    /**
+     * MongoUpdateBolt Constructor.
+     * @param url The MongoDB server url
+     * @param collectionName The collection where reading/writing data
+     * @param queryCreator QueryFilterCreator
+     * @param mapper MongoMapper converting tuple to an MongoDB document
+     */
+    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
+
+        this.queryCreator = queryCreator;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
+        try {
+            //get document
+            Document doc = mapper.toDocument(tuple);
+            //get query filter
+            Bson filter = queryCreator.createFilter(tuple);
+            mongoClient.update(filter, doc, upsert, many);
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
+
+    public MongoUpdateBolt withUpsert(boolean upsert) {
+        this.upsert = upsert;
+        return this;
+    }
+
+    public MongoUpdateBolt withMany(boolean many) {
+        this.many = many;
+        return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
index c264f03..52ed237 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
@@ -1,109 +1,109 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertManyOptions;
-import com.mongodb.client.model.UpdateOptions;
-
-import java.util.List;
-
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-public class MongoDbClient {
-
-    private MongoClient client;
-    private MongoCollection<Document> collection;
-
-    /**
-     * The MongoDbClient constructor.
-     * @param url The Mongo server url
-     * @param collectionName The Mongo collection to read/write data
-     */
-    public MongoDbClient(String url, String collectionName) {
-        //Creates a MongoURI from the given string.
-        MongoClientURI uri = new MongoClientURI(url);
-        //Creates a MongoClient described by a URI.
-        this.client = new MongoClient(uri);
-        //Gets a Database.
-        MongoDatabase db = client.getDatabase(uri.getDatabase());
-        //Gets a collection.
-        this.collection = db.getCollection(collectionName);
-    }
-
-    /**
-     * Inserts one or more documents.
-     * This method is equivalent to a call to the bulkWrite method.
-     * The documents will be inserted in the order provided, 
-     * stopping on the first failed insertion. 
-     * 
-     * @param documents documents
-     */
-    public void insert(List<Document> documents, boolean ordered) {
-        InsertManyOptions options = new InsertManyOptions();
-        if (!ordered) {
-            options.ordered(false);
-        }
-        collection.insertMany(documents, options);
-    }
-
-    /**
-     * Update a single or all documents in the collection according to the specified arguments.
-     * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
-     * 
-     * @param filter Bson filter
-     * @param document Bson document
-     * @param upsert a new document should be inserted if there are no matches to the query filter
-     * @param many whether find all documents according to the query filter
-     */
-    public void update(Bson filter, Bson document, boolean upsert, boolean many) {
-        //TODO batch updating
-        UpdateOptions options = new UpdateOptions();
-        if (upsert) {
-            options.upsert(true);
-        }
-        if (many) {
-            collection.updateMany(filter, document, options);
-        } else {
-            collection.updateOne(filter, document, options);
-        }
-    }
-
-    /**
-     * Finds a single document in the collection according to the specified arguments.
-     *
-     * @param filter Bson filter
-     */
-    public Document find(Bson filter) {
-        //TODO batch finding
-        return collection.find(filter).first();
-    }
-
-    /**
-     * Closes all resources associated with this instance.
-     */
-    public void close() {
-        client.close();
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+public class MongoDbClient {
+
+    private MongoClient client;
+    private MongoCollection<Document> collection;
+
+    /**
+     * The MongoDbClient constructor.
+     * @param url The Mongo server url
+     * @param collectionName The Mongo collection to read/write data
+     */
+    public MongoDbClient(String url, String collectionName) {
+        //Creates a MongoURI from the given string.
+        MongoClientURI uri = new MongoClientURI(url);
+        //Creates a MongoClient described by a URI.
+        this.client = new MongoClient(uri);
+        //Gets a Database.
+        MongoDatabase db = client.getDatabase(uri.getDatabase());
+        //Gets a collection.
+        this.collection = db.getCollection(collectionName);
+    }
+
+    /**
+     * Inserts one or more documents.
+     * This method is equivalent to a call to the bulkWrite method.
+     * The documents will be inserted in the order provided, 
+     * stopping on the first failed insertion. 
+     * 
+     * @param documents documents
+     */
+    public void insert(List<Document> documents, boolean ordered) {
+        InsertManyOptions options = new InsertManyOptions();
+        if (!ordered) {
+            options.ordered(false);
+        }
+        collection.insertMany(documents, options);
+    }
+
+    /**
+     * Update a single or all documents in the collection according to the specified arguments.
+     * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
+     * 
+     * @param filter Bson filter
+     * @param document Bson document
+     * @param upsert a new document should be inserted if there are no matches to the query filter
+     * @param many whether find all documents according to the query filter
+     */
+    public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+        //TODO batch updating
+        UpdateOptions options = new UpdateOptions();
+        if (upsert) {
+            options.upsert(true);
+        }
+        if (many) {
+            collection.updateMany(filter, document, options);
+        } else {
+            collection.updateOne(filter, document, options);
+        }
+    }
+
+    /**
+     * Finds a single document in the collection according to the specified arguments.
+     *
+     * @param filter Bson filter
+     */
+    public Document find(Bson filter) {
+        //TODO batch finding
+        return collection.find(filter).first();
+    }
+
+    /**
+     * Closes all resources associated with this instance.
+     */
+    public void close() {
+        client.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index 371b1ed..521678e 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-/**
- * Create a MongoDB query Filter by given Tuple/trident keys.
- */
-public interface QueryFilterCreator extends Serializable {
-
-    /**
-     * Create a query Filter by given Tuple.
-     * 
-     * @param tuple ITuple tuple
-     * @return query Filter
-     */
-    Bson createFilter(ITuple tuple);
-
-    /**
-     * Create a query Filter by given trident keys.
-     *
-     * @param keys keys
-     * @return query Filter
-     */
-    Bson createFilterByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+/**
+ * Create a MongoDB query Filter by given Tuple/trident keys.
+ */
+public interface QueryFilterCreator extends Serializable {
+
+    /**
+     * Create a query Filter by given Tuple.
+     * 
+     * @param tuple ITuple tuple
+     * @return query Filter
+     */
+    Bson createFilter(ITuple tuple);
+
+    /**
+     * Create a query Filter by given trident keys.
+     *
+     * @param keys keys
+     * @return query Filter
+     */
+    Bson createFilterByKeys(List<Object> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 1f5774a..16a71ec 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.client.model.Filters;
-
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getId(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.client.model.Filters;
+
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+    private String field;
+    
+    @Override
+    public Bson createFilter(ITuple tuple) {
+        return Filters.eq(field, tuple.getValueByField(field));
+    }
+
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getId(keys));
+    }
+
+    public SimpleQueryFilterCreator withField(String field) {
+        this.field = field;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 3ea9c16..ed27c92 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-/**
- * Given a Tuple/trident keys, converts it to an MongoDB document.
- */
-public interface MongoMapper extends Serializable {
-
-    /**
-     * Converts a Tuple to a Document.
-     * 
-     * @param tuple the incoming tuple
-     * @return the MongoDB document
-     */
-    Document toDocument(ITuple tuple);
-
-    /**
-     * Converts a keys to a Document.
-     *
-     * @param keys the trident keys
-     * @return the MongoDB document
-     */
-    Document toDocumentByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+/**
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
+ */
+public interface MongoMapper extends Serializable {
+
+    /**
+     * Converts a Tuple to a Document.
+     * 
+     * @param tuple the incoming tuple
+     * @return the MongoDB document
+     */
+    Document toDocument(ITuple tuple);
+
+    /**
+     * Converts a keys to a Document.
+     *
+     * @param keys the trident keys
+     * @return the MongoDB document
+     */
+    Document toDocumentByKeys(List<Object> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 7bb0a06..1a38828 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -1,55 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.List;
-
-import org.apache.storm.mongodb.common.MongoUtils;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoMapper implements MongoMapper {
-
-    private String[] fields;
-
-    public SimpleMongoMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getId(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.util.List;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoMapper implements MongoMapper {
+
+    private String[] fields;
+
+    public SimpleMongoMapper(String... fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for (String field : fields) {
+            document.append(field, tuple.getValueByField(field));
+        }
+        return document;
+    }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getId(keys));
+        return document;
+    }
+
+    public SimpleMongoMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index 3ed17ec..72328c9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
-    private String[] fields;
-
-    public SimpleMongoUpdateMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+    private String[] fields;
+
+    public SimpleMongoUpdateMapper(String... fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+        Document document = new Document();
+        for (String field : fields) {
+            document.append(field, tuple.getValueByField(field));
+        }
+        //$set operator: Sets the value of a field in a document.
+        return new Document("$set", document);
+    }
+
+    public SimpleMongoUpdateMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 77c394c..100f931 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -1,145 +1,145 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoState implements State {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
-
-    private Options options;
-    private MongoDbClient mongoClient;
-    private Map<String, Object> map;
-
-    protected MongoState(Map<String, Object> map, Options options) {
-        this.options = options;
-        this.map = map;
-    }
-
-    public static class Options implements Serializable {
-        private String url;
-        private String collectionName;
-        private MongoMapper mapper;
-        private MongoLookupMapper lookupMapper;
-        private QueryFilterCreator queryCreator;
-
-        public Options withUrl(String url) {
-            this.url = url;
-            return this;
-        }
-
-        public Options withCollectionName(String collectionName) {
-            this.collectionName = collectionName;
-            return this;
-        }
-
-        public Options withMapper(MongoMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-
-        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
-            this.lookupMapper = lookupMapper;
-            return this;
-        }
-
-        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
-            this.queryCreator = queryCreator;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        Validate.notEmpty(options.url, "url can not be blank or null");
-        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
-
-        this.mongoClient = new MongoDbClient(options.url, options.collectionName);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is noop.");
-    }
-
-    /**
-     * Update Mongo state.
-     * @param tuples trident tuples
-     * @param collector trident collector
-     */
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        List<Document> documents = Lists.newArrayList();
-        for (TridentTuple tuple : tuples) {
-            Document document = options.mapper.toDocument(tuple);
-            documents.add(document);
-        }
-
-        try {
-            this.mongoClient.insert(documents, true);
-        } catch (Exception e) {
-            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-    }
-
-    /**
-     * Batch retrieve values.
-     * @param tridentTuples trident tuples
-     * @return values
-     */
-    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
-        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
-        try {
-            for (TridentTuple tuple : tridentTuples) {
-                Bson filter = options.queryCreator.createFilter(tuple);
-                Document doc = mongoClient.find(filter);
-                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
-                batchRetrieveResult.add(values);
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch get operation failed. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-        return batchRetrieveResult;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
+
+    private Options options;
+    private MongoDbClient mongoClient;
+    private Map<String, Object> map;
+
+    protected MongoState(Map<String, Object> map, Options options) {
+        this.options = options;
+        this.map = map;
+    }
+
+    public static class Options implements Serializable {
+        private String url;
+        private String collectionName;
+        private MongoMapper mapper;
+        private MongoLookupMapper lookupMapper;
+        private QueryFilterCreator queryCreator;
+
+        public Options withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public Options withCollectionName(String collectionName) {
+            this.collectionName = collectionName;
+            return this;
+        }
+
+        public Options withMapper(MongoMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+
+        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+            this.lookupMapper = lookupMapper;
+            return this;
+        }
+
+        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+            this.queryCreator = queryCreator;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Validate.notEmpty(options.url, "url can not be blank or null");
+        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+
+        this.mongoClient = new MongoDbClient(options.url, options.collectionName);
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop.");
+    }
+
+    /**
+     * Update Mongo state.
+     * @param tuples trident tuples
+     * @param collector trident collector
+     */
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        List<Document> documents = Lists.newArrayList();
+        for (TridentTuple tuple : tuples) {
+            Document document = options.mapper.toDocument(tuple);
+            documents.add(document);
+        }
+
+        try {
+            this.mongoClient.insert(documents, true);
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+
+    /**
+     * Batch retrieve values.
+     * @param tridentTuples trident tuples
+     * @return values
+     */
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+        try {
+            for (TridentTuple tuple : tridentTuples) {
+                Bson filter = options.queryCreator.createFilter(tuple);
+                Document doc = mongoClient.find(filter);
+                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+                batchRetrieveResult.add(values);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
index a6797e2..d27d9a9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
@@ -1,43 +1,43 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class MongoStateFactory implements StateFactory {
-
-    private MongoState.Options options;
-
-    public MongoStateFactory(MongoState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
-            int partitionIndex, int numPartitions) {
-        MongoState state = new MongoState(conf, options);
-        state.prepare();
-        return state;
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class MongoStateFactory implements StateFactory {
+
+    private MongoState.Options options;
+
+    public MongoStateFactory(MongoState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+            int partitionIndex, int numPartitions) {
+        MongoState state = new MongoState(conf, options);
+        state.prepare();
+        return state;
+    }
+
+}