You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/03/23 17:10:05 UTC
[2/5] storm git commit: set '* text=auto' in .gitattributes in order
to avoid merge work because of line feed changes
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index b5b9e96..5c81710 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,325 +1,325 @@
-#Storm MongoDB
-
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
-
-## Insert into Database
-The bolt and trident state included in this package for inserting data into a database collection.
-
-### MongoMapper
-The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
-
-```java
-public interface MongoMapper extends Serializable {
- Document toDocument(ITuple tuple);
- Document toDocumentByKeys(List<Object> keys);
-}
-```
-
-### SimpleMongoMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-
-```java
-public class SimpleMongoMapper implements MongoMapper {
- private String[] fields;
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for(String field : fields){
- document.append(field, tuple.getValueByField(field));
- }
- return document;
- }
-
- @Override
- public Document toDocumentByKeys(List<Object> keys) {
- Document document = new Document();
- document.append("_id", MongoUtils.getID(keys));
- return document;
- }
-
- public SimpleMongoMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
-```
-
-### MongoInsertBolt
-To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
- `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
-
-More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
-
- ```java
-String url = "mongodb://127.0.0.1:27017/test";
-String collectionName = "wordcount";
-
-MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
-MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
- ```
-
-
-## Update from Database
-The bolt included in this package for updating data from a database collection.
-
-### MongoUpdateMapper
-The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
-
-```java
-public interface MongoUpdateMapper extends MongoMapper { }
-```
-
-### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
-`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
-https://docs.mongodb.org/manual/reference/operator/update/
-
-```java
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
- private String[] fields;
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for(String field : fields){
- document.append(field, tuple.getValueByField(field));
- }
- //$set operator: Sets the value of a field in a document.
- return new Document("$set", document);
- }
-
- public SimpleMongoUpdateMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
-```
-
-
-### QueryFilterCreator
-The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
-
- ```java
-public interface QueryFilterCreator extends Serializable {
- Bson createFilter(ITuple tuple);
- Bson createFilterByKeys(List<Object> keys);
-}
- ```
-
-### SimpleQueryFilterCreator
-`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit
-https://docs.mongodb.org/manual/reference/operator/query/
-
- ```java
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
- private String field;
-
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.eq(field, tuple.getValueByField(field));
- }
-
- @Override
- public Bson createFilterByKeys(List<Object> keys) {
- return Filters.eq("_id", MongoUtils.getID(keys));
- }
-
- public SimpleQueryFilterCreator withField(String field) {
- this.field = field;
- return this;
- }
-
-}
- ```
-
-### MongoUpdateBolt
-To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
-
- ```java
- MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
- .withFields("word", "count");
-
- QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
- //if a new document should be inserted if there are no matches to the query filter
- //updateBolt.withUpsert(true);
-
- //whether find all documents according to the query filter
- //updateBolt.withMany(true);
- ```
-
- Or use a anonymous inner class implementation for `QueryFilterCreator`:
-
- ```java
- MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
- .withFields("word", "count");
-
- QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.gt("count", 3);
- }
- };
-
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
-
- //if a new document should be inserted if there are no matches to the query filter
- //updateBolt.withUpsert(true);
- ```
-
-
-## Lookup from Database
-The bolt included in this package for selecting data from a database collection.
-
-### MongoLookupMapper
-The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
-
-```java
-public interface MongoLookupMapper extends Serializable {
-
- List<Values> toTuple(ITuple input, Document doc);
-
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-### SimpleMongoLookupMapper
-`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
-
-```java
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
- private String[] fields;
-
- @Override
- public List<Values> toTuple(ITuple input, Document doc) {
- Values values = new Values();
-
- for(String field : fields) {
- if(input.contains(field)) {
- values.add(input.getValueByField(field));
- } else {
- values.add(doc.get(field));
- }
- }
- List<Values> result = new ArrayList<Values>();
- result.add(values);
- return result;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(fields));
- }
-
- public SimpleMongoLookupMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-
-}
-```
-
-### MongoLookupBolt
-To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
-
- ```java
- MongoLookupMapper mapper = new SimpleMongoLookupMapper()
- .withFields("word", "count");
-
- QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
- ```
-
-## Mongo Trident State&MapState
-### Trident State
-We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields,
- new MongoStateUpdater(), new Fields());
-
- TridentState state = topology.newStaticState(factory);
- stream = stream.stateQuery(state, new Fields("word"),
- new MongoStateQuery(), new Fields("columnName", "columnValue"));
- stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
-
-### Trident MapState
-We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
- .withField("word");
-
- MongoMapState.Options options = new MongoMapState.Options();
- options.url = url;
- options.collectionName = collectionName;
- options.mapper = mapper;
- options.queryCreator = filterCreator;
-
- StateFactory factory = MongoMapState.transactional(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- TridentState state = stream.groupBy(new Fields("word"))
- .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
-
- stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
- .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
- ```
-
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
-
+#Storm MongoDB
+
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
+
+## Insert into Database
+The bolt and trident state included in this package for inserting data into a database collection.
+
+### MongoMapper
+The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
+
+```java
+public interface MongoMapper extends Serializable {
+ Document toDocument(ITuple tuple);
+ Document toDocumentByKeys(List<Object> keys);
+}
+```
+
+### SimpleMongoMapper
+`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+
+```java
+public class SimpleMongoMapper implements MongoMapper {
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
+ public SimpleMongoMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+### MongoInsertBolt
+To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme:
+ `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
+
+More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
+
+ ```java
+String url = "mongodb://127.0.0.1:27017/test";
+String collectionName = "wordcount";
+
+MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
+ ```
+
+
+## Update from Database
+The bolt included in this package for updating data from a database collection.
+
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
+### SimpleMongoUpdateMapper
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/update/
+
+```java
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+ private String[] fields;
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for(String field : fields){
+ document.append(field, tuple.getValueByField(field));
+ }
+ //$set operator: Sets the value of a field in a document.
+ return new Document("$set", document);
+ }
+
+ public SimpleMongoUpdateMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
+```
+
+
+### QueryFilterCreator
+The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
+
+ ```java
+public interface QueryFilterCreator extends Serializable {
+ Bson createFilter(ITuple tuple);
+ Bson createFilterByKeys(List<Object> keys);
+}
+ ```
+
+### SimpleQueryFilterCreator
+`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit
+https://docs.mongodb.org/manual/reference/operator/query/
+
+ ```java
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+ private String field;
+
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.eq(field, tuple.getValueByField(field));
+ }
+
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
+ public SimpleQueryFilterCreator withField(String field) {
+ this.field = field;
+ return this;
+ }
+
+}
+ ```
+
+### MongoUpdateBolt
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
+
+ ```java
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
+ ```
+
+ Or use a anonymous inner class implementation for `QueryFilterCreator`:
+
+ ```java
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.gt("count", 3);
+ }
+ };
+
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
+
+ //if a new document should be inserted if there are no matches to the query filter
+ //updateBolt.withUpsert(true);
+ ```
+
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+ List<Values> toTuple(ITuple input, Document doc);
+
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
index ee66811..13049de 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -1,62 +1,62 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
-
-public abstract class AbstractMongoBolt extends BaseRichBolt {
-
- private String url;
- private String collectionName;
-
- protected OutputCollector collector;
- protected MongoDbClient mongoClient;
-
- /**
- * AbstractMongoBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- */
- public AbstractMongoBolt(String url, String collectionName) {
- Validate.notEmpty(url, "url can not be blank or null");
- Validate.notEmpty(collectionName, "collectionName can not be blank or null");
-
- this.url = url;
- this.collectionName = collectionName;
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- this.mongoClient = new MongoDbClient(url, collectionName);
- }
-
- @Override
- public void cleanup() {
- this.mongoClient.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
+
+public abstract class AbstractMongoBolt extends BaseRichBolt {
+
+ private String url;
+ private String collectionName;
+
+ protected OutputCollector collector;
+ protected MongoDbClient mongoClient;
+
+ /**
+ * AbstractMongoBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ */
+ public AbstractMongoBolt(String url, String collectionName) {
+ Validate.notEmpty(url, "url can not be blank or null");
+ Validate.notEmpty(collectionName, "collectionName can not be blank or null");
+
+ this.url = url;
+ this.collectionName = collectionName;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+ this.mongoClient = new MongoDbClient(url, collectionName);
+ }
+
+ @Override
+ public void cleanup() {
+ this.mongoClient.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index afd1142..5d233da 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -1,124 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-
-/**
- * Basic bolt for writing to MongoDB.
- * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
- */
-public class MongoInsertBolt extends AbstractMongoBolt {
-
- private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
-
- private MongoMapper mapper;
-
- private boolean ordered = true; //default is ordered.
-
- private int batchSize;
-
- private BatchHelper batchHelper;
-
- private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-
- /**
- * MongoInsertBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- * @param mapper MongoMapper converting tuple to an MongoDB document
- */
- public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
- super(url, collectionName);
-
- Validate.notNull(mapper, "MongoMapper can not be null");
-
- this.mapper = mapper;
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- if (batchHelper.shouldHandle(tuple)) {
- batchHelper.addBatch(tuple);
- }
-
- if (batchHelper.shouldFlush()) {
- flushTuples();
- batchHelper.ack();
- }
- } catch (Exception e) {
- batchHelper.fail(e);
- }
- }
-
- private void flushTuples() {
- List<Document> docs = new LinkedList<>();
- for (Tuple t : batchHelper.getBatchTuples()) {
- Document doc = mapper.toDocument(t);
- docs.add(doc);
- }
- mongoClient.insert(docs, ordered);
- }
-
- public MongoInsertBolt withBatchSize(int batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- public MongoInsertBolt withOrdered(boolean ordered) {
- this.ordered = ordered;
- return this;
- }
-
- public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
- this.flushIntervalSecs = flushIntervalSecs;
- return this;
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context,
- OutputCollector collector) {
- super.prepare(topoConf, context, collector);
- this.batchHelper = new BatchHelper(batchSize, collector);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.BatchHelper;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+
+/**
+ * Basic bolt for writing to MongoDB.
+ * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoInsertBolt extends AbstractMongoBolt {
+
+ private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
+
+ private MongoMapper mapper;
+
+ private boolean ordered = true; //default is ordered.
+
+ private int batchSize;
+
+ private BatchHelper batchHelper;
+
+ private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
+ /**
+ * MongoInsertBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ * @param mapper MongoMapper converting tuple to an MongoDB document
+ */
+ public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
+ super(url, collectionName);
+
+ Validate.notNull(mapper, "MongoMapper can not be null");
+
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ if (batchHelper.shouldHandle(tuple)) {
+ batchHelper.addBatch(tuple);
+ }
+
+ if (batchHelper.shouldFlush()) {
+ flushTuples();
+ batchHelper.ack();
+ }
+ } catch (Exception e) {
+ batchHelper.fail(e);
+ }
+ }
+
+ private void flushTuples() {
+ List<Document> docs = new LinkedList<>();
+ for (Tuple t : batchHelper.getBatchTuples()) {
+ Document doc = mapper.toDocument(t);
+ docs.add(doc);
+ }
+ mongoClient.insert(docs, ordered);
+ }
+
+ public MongoInsertBolt withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public MongoInsertBolt withOrdered(boolean ordered) {
+ this.ordered = ordered;
+ return this;
+ }
+
+ public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+ this.flushIntervalSecs = flushIntervalSecs;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ super.prepare(topoConf, context, collector);
+ this.batchHelper = new BatchHelper(batchSize, collector);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 5579b8e..4f92b32 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -1,93 +1,93 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for updating from MongoDB.
- * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
- */
-public class MongoUpdateBolt extends AbstractMongoBolt {
-
- private QueryFilterCreator queryCreator;
- private MongoUpdateMapper mapper;
-
- private boolean upsert; //the default is false.
- private boolean many; //the default is false.
-
- /**
- * MongoUpdateBolt Constructor.
- * @param url The MongoDB server url
- * @param collectionName The collection where reading/writing data
- * @param queryCreator QueryFilterCreator
- * @param mapper MongoMapper converting tuple to an MongoDB document
- */
- public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
- super(url, collectionName);
-
- Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
- Validate.notNull(mapper, "MongoUpdateMapper can not be null");
-
- this.queryCreator = queryCreator;
- this.mapper = mapper;
- }
-
- @Override
- public void execute(Tuple tuple) {
- if (TupleUtils.isTick(tuple)) {
- return;
- }
-
- try {
- //get document
- Document doc = mapper.toDocument(tuple);
- //get query filter
- Bson filter = queryCreator.createFilter(tuple);
- mongoClient.update(filter, doc, upsert, many);
- this.collector.ack(tuple);
- } catch (Exception e) {
- this.collector.reportError(e);
- this.collector.fail(tuple);
- }
- }
-
- public MongoUpdateBolt withUpsert(boolean upsert) {
- this.upsert = upsert;
- return this;
- }
-
- public MongoUpdateBolt withMany(boolean many) {
- this.many = many;
- return this;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+/**
+ * Basic bolt for updating from MongoDB.
+ * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection.
+ */
+public class MongoUpdateBolt extends AbstractMongoBolt {
+
+ private QueryFilterCreator queryCreator;
+ private MongoUpdateMapper mapper;
+
+ private boolean upsert; //the default is false.
+ private boolean many; //the default is false.
+
+ /**
+ * MongoUpdateBolt Constructor.
+ * @param url The MongoDB server url
+ * @param collectionName The collection where reading/writing data
+ * @param queryCreator QueryFilterCreator
+ * @param mapper MongoMapper converting tuple to an MongoDB document
+ */
+ public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
+ super(url, collectionName);
+
+ Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+ Validate.notNull(mapper, "MongoUpdateMapper can not be null");
+
+ this.queryCreator = queryCreator;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ return;
+ }
+
+ try {
+ //get document
+ Document doc = mapper.toDocument(tuple);
+ //get query filter
+ Bson filter = queryCreator.createFilter(tuple);
+ mongoClient.update(filter, doc, upsert, many);
+ this.collector.ack(tuple);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(tuple);
+ }
+ }
+
+ public MongoUpdateBolt withUpsert(boolean upsert) {
+ this.upsert = upsert;
+ return this;
+ }
+
+ public MongoUpdateBolt withMany(boolean many) {
+ this.many = many;
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
index c264f03..52ed237 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
@@ -1,109 +1,109 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertManyOptions;
-import com.mongodb.client.model.UpdateOptions;
-
-import java.util.List;
-
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-public class MongoDbClient {
-
- private MongoClient client;
- private MongoCollection<Document> collection;
-
- /**
- * The MongoDbClient constructor.
- * @param url The Mongo server url
- * @param collectionName The Mongo collection to read/write data
- */
- public MongoDbClient(String url, String collectionName) {
- //Creates a MongoURI from the given string.
- MongoClientURI uri = new MongoClientURI(url);
- //Creates a MongoClient described by a URI.
- this.client = new MongoClient(uri);
- //Gets a Database.
- MongoDatabase db = client.getDatabase(uri.getDatabase());
- //Gets a collection.
- this.collection = db.getCollection(collectionName);
- }
-
- /**
- * Inserts one or more documents.
- * This method is equivalent to a call to the bulkWrite method.
- * The documents will be inserted in the order provided,
- * stopping on the first failed insertion.
- *
- * @param documents documents
- */
- public void insert(List<Document> documents, boolean ordered) {
- InsertManyOptions options = new InsertManyOptions();
- if (!ordered) {
- options.ordered(false);
- }
- collection.insertMany(documents, options);
- }
-
- /**
- * Update a single or all documents in the collection according to the specified arguments.
- * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
- *
- * @param filter Bson filter
- * @param document Bson document
- * @param upsert a new document should be inserted if there are no matches to the query filter
- * @param many whether find all documents according to the query filter
- */
- public void update(Bson filter, Bson document, boolean upsert, boolean many) {
- //TODO batch updating
- UpdateOptions options = new UpdateOptions();
- if (upsert) {
- options.upsert(true);
- }
- if (many) {
- collection.updateMany(filter, document, options);
- } else {
- collection.updateOne(filter, document, options);
- }
- }
-
- /**
- * Finds a single document in the collection according to the specified arguments.
- *
- * @param filter Bson filter
- */
- public Document find(Bson filter) {
- //TODO batch finding
- return collection.find(filter).first();
- }
-
- /**
- * Closes all resources associated with this instance.
- */
- public void close() {
- client.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.List;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+public class MongoDbClient {
+
+ private MongoClient client;
+ private MongoCollection<Document> collection;
+
+ /**
+ * The MongoDbClient constructor.
+ * @param url The Mongo server url
+ * @param collectionName The Mongo collection to read/write data
+ */
+ public MongoDbClient(String url, String collectionName) {
+ //Creates a MongoURI from the given string.
+ MongoClientURI uri = new MongoClientURI(url);
+ //Creates a MongoClient described by a URI.
+ this.client = new MongoClient(uri);
+ //Gets a Database.
+ MongoDatabase db = client.getDatabase(uri.getDatabase());
+ //Gets a collection.
+ this.collection = db.getCollection(collectionName);
+ }
+
+ /**
+ * Inserts one or more documents.
+ * This method is equivalent to a call to the bulkWrite method.
+ * The documents will be inserted in the order provided,
+ * stopping on the first failed insertion.
+ *
+ * @param documents documents
+ */
+ public void insert(List<Document> documents, boolean ordered) {
+ InsertManyOptions options = new InsertManyOptions();
+ if (!ordered) {
+ options.ordered(false);
+ }
+ collection.insertMany(documents, options);
+ }
+
+ /**
+ * Update a single or all documents in the collection according to the specified arguments.
+ * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
+ *
+ * @param filter Bson filter
+ * @param document Bson document
+ * @param upsert a new document should be inserted if there are no matches to the query filter
+ * @param many whether find all documents according to the query filter
+ */
+ public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+ //TODO batch updating
+ UpdateOptions options = new UpdateOptions();
+ if (upsert) {
+ options.upsert(true);
+ }
+ if (many) {
+ collection.updateMany(filter, document, options);
+ } else {
+ collection.updateOne(filter, document, options);
+ }
+ }
+
+ /**
+ * Finds a single document in the collection according to the specified arguments.
+ *
+ * @param filter Bson filter
+ */
+ public Document find(Bson filter) {
+ //TODO batch finding
+ return collection.find(filter).first();
+ }
+
+ /**
+ * Closes all resources associated with this instance.
+ */
+ public void close() {
+ client.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index 371b1ed..521678e 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-/**
- * Create a MongoDB query Filter by given Tuple/trident keys.
- */
-public interface QueryFilterCreator extends Serializable {
-
- /**
- * Create a query Filter by given Tuple.
- *
- * @param tuple ITuple tuple
- * @return query Filter
- */
- Bson createFilter(ITuple tuple);
-
- /**
- * Create a query Filter by given trident keys.
- *
- * @param keys keys
- * @return query Filter
- */
- Bson createFilterByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+/**
+ * Create a MongoDB query Filter by given Tuple/trident keys.
+ */
+public interface QueryFilterCreator extends Serializable {
+
+ /**
+ * Create a query Filter by given Tuple.
+ *
+ * @param tuple ITuple tuple
+ * @return query Filter
+ */
+ Bson createFilter(ITuple tuple);
+
+ /**
+ * Create a query Filter by given trident keys.
+ *
+ * @param keys keys
+ * @return query Filter
+ */
+ Bson createFilterByKeys(List<Object> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 1f5774a..16a71ec 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.client.model.Filters;
-
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
- private String field;
-
- @Override
- public Bson createFilter(ITuple tuple) {
- return Filters.eq(field, tuple.getValueByField(field));
- }
-
- @Override
- public Bson createFilterByKeys(List<Object> keys) {
- return Filters.eq("_id", MongoUtils.getId(keys));
- }
-
- public SimpleQueryFilterCreator withField(String field) {
- this.field = field;
- return this;
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common;
+
+import com.mongodb.client.model.Filters;
+
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.conversions.Bson;
+
+public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
+ private String field;
+
+ @Override
+ public Bson createFilter(ITuple tuple) {
+ return Filters.eq(field, tuple.getValueByField(field));
+ }
+
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getId(keys));
+ }
+
+ public SimpleQueryFilterCreator withField(String field) {
+ this.field = field;
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 3ea9c16..ed27c92 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -1,47 +1,47 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-/**
- * Given a Tuple/trident keys, converts it to an MongoDB document.
- */
-public interface MongoMapper extends Serializable {
-
- /**
- * Converts a Tuple to a Document.
- *
- * @param tuple the incoming tuple
- * @return the MongoDB document
- */
- Document toDocument(ITuple tuple);
-
- /**
- * Converts a keys to a Document.
- *
- * @param keys the trident keys
- * @return the MongoDB document
- */
- Document toDocumentByKeys(List<Object> keys);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+/**
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
+ */
+public interface MongoMapper extends Serializable {
+
+ /**
+ * Converts a Tuple to a Document.
+ *
+ * @param tuple the incoming tuple
+ * @return the MongoDB document
+ */
+ Document toDocument(ITuple tuple);
+
+ /**
+ * Converts a keys to a Document.
+ *
+ * @param keys the trident keys
+ * @return the MongoDB document
+ */
+ Document toDocumentByKeys(List<Object> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 7bb0a06..1a38828 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -1,55 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.List;
-
-import org.apache.storm.mongodb.common.MongoUtils;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoMapper implements MongoMapper {
-
- private String[] fields;
-
- public SimpleMongoMapper(String... fields) {
- this.fields = fields;
- }
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for (String field : fields) {
- document.append(field, tuple.getValueByField(field));
- }
- return document;
- }
-
- @Override
- public Document toDocumentByKeys(List<Object> keys) {
- Document document = new Document();
- document.append("_id", MongoUtils.getId(keys));
- return document;
- }
-
- public SimpleMongoMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import java.util.List;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoMapper implements MongoMapper {
+
+ private String[] fields;
+
+ public SimpleMongoMapper(String... fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for (String field : fields) {
+ document.append(field, tuple.getValueByField(field));
+ }
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getId(keys));
+ return document;
+ }
+
+ public SimpleMongoMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index 3ed17ec..72328c9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
-
- private String[] fields;
-
- public SimpleMongoUpdateMapper(String... fields) {
- this.fields = fields;
- }
-
- @Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- for (String field : fields) {
- document.append(field, tuple.getValueByField(field));
- }
- //$set operator: Sets the value of a field in a document.
- return new Document("$set", document);
- }
-
- public SimpleMongoUpdateMapper withFields(String... fields) {
- this.fields = fields;
- return this;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
+ private String[] fields;
+
+ public SimpleMongoUpdateMapper(String... fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ for (String field : fields) {
+ document.append(field, tuple.getValueByField(field));
+ }
+ //$set operator: Sets the value of a field in a document.
+ return new Document("$set", document);
+ }
+
+ public SimpleMongoUpdateMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 77c394c..100f931 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -1,145 +1,145 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoState implements State {
-
- private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
-
- private Options options;
- private MongoDbClient mongoClient;
- private Map<String, Object> map;
-
- protected MongoState(Map<String, Object> map, Options options) {
- this.options = options;
- this.map = map;
- }
-
- public static class Options implements Serializable {
- private String url;
- private String collectionName;
- private MongoMapper mapper;
- private MongoLookupMapper lookupMapper;
- private QueryFilterCreator queryCreator;
-
- public Options withUrl(String url) {
- this.url = url;
- return this;
- }
-
- public Options withCollectionName(String collectionName) {
- this.collectionName = collectionName;
- return this;
- }
-
- public Options withMapper(MongoMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
- return this;
- }
-
- public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
- this.queryCreator = queryCreator;
- return this;
- }
- }
-
- protected void prepare() {
- Validate.notEmpty(options.url, "url can not be blank or null");
- Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
-
- this.mongoClient = new MongoDbClient(options.url, options.collectionName);
- }
-
- @Override
- public void beginCommit(Long txid) {
- LOG.debug("beginCommit is noop.");
- }
-
- @Override
- public void commit(Long txid) {
- LOG.debug("commit is noop.");
- }
-
- /**
- * Update Mongo state.
- * @param tuples trident tuples
- * @param collector trident collector
- */
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- List<Document> documents = Lists.newArrayList();
- for (TridentTuple tuple : tuples) {
- Document document = options.mapper.toDocument(tuple);
- documents.add(document);
- }
-
- try {
- this.mongoClient.insert(documents, true);
- } catch (Exception e) {
- LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
- throw new FailedException(e);
- }
- }
-
- /**
- * Batch retrieve values.
- * @param tridentTuples trident tuples
- * @return values
- */
- public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
- List<List<Values>> batchRetrieveResult = Lists.newArrayList();
- try {
- for (TridentTuple tuple : tridentTuples) {
- Bson filter = options.queryCreator.createFilter(tuple);
- Document doc = mongoClient.find(filter);
- List<Values> values = options.lookupMapper.toTuple(tuple, doc);
- batchRetrieveResult.add(values);
- }
- } catch (Exception e) {
- LOG.warn("Batch get operation failed. Triggering replay.", e);
- throw new FailedException(e);
- }
- return batchRetrieveResult;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDbClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
+
+ private Options options;
+ private MongoDbClient mongoClient;
+ private Map<String, Object> map;
+
+ protected MongoState(Map<String, Object> map, Options options) {
+ this.options = options;
+ this.map = map;
+ }
+
+ public static class Options implements Serializable {
+ private String url;
+ private String collectionName;
+ private MongoMapper mapper;
+ private MongoLookupMapper lookupMapper;
+ private QueryFilterCreator queryCreator;
+
+ public Options withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Options withCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ return this;
+ }
+
+ public Options withMapper(MongoMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+ return this;
+ }
+
+ public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+ this.queryCreator = queryCreator;
+ return this;
+ }
+ }
+
+ protected void prepare() {
+ Validate.notEmpty(options.url, "url can not be blank or null");
+ Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+
+ this.mongoClient = new MongoDbClient(options.url, options.collectionName);
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop.");
+ }
+
+ /**
+ * Update Mongo state.
+ * @param tuples trident tuples
+ * @param collector trident collector
+ */
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ List<Document> documents = Lists.newArrayList();
+ for (TridentTuple tuple : tuples) {
+ Document document = options.mapper.toDocument(tuple);
+ documents.add(document);
+ }
+
+ try {
+ this.mongoClient.insert(documents, true);
+ } catch (Exception e) {
+ LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ }
+
+ /**
+ * Batch retrieve values.
+ * @param tridentTuples trident tuples
+ * @return values
+ */
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+ try {
+ for (TridentTuple tuple : tridentTuples) {
+ Bson filter = options.queryCreator.createFilter(tuple);
+ Document doc = mongoClient.find(filter);
+ List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+ batchRetrieveResult.add(values);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch get operation failed. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9c427cb/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
index a6797e2..d27d9a9 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
@@ -1,43 +1,43 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class MongoStateFactory implements StateFactory {
-
- private MongoState.Options options;
-
- public MongoStateFactory(MongoState.Options options) {
- this.options = options;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics,
- int partitionIndex, int numPartitions) {
- MongoState state = new MongoState(conf, options);
- state.prepare();
- return state;
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.mongodb.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class MongoStateFactory implements StateFactory {
+
+ private MongoState.Options options;
+
+ public MongoStateFactory(MongoState.Options options) {
+ this.options = options;
+ }
+
+ @Override
+ public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+ int partitionIndex, int numPartitions) {
+ MongoState state = new MongoState(conf, options);
+ state.prepare();
+ return state;
+ }
+
+}