You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2016/12/16 03:32:46 UTC
[3/4] storm git commit: update examples and documents
update examples and documents
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a833af41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a833af41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a833af41
Branch: refs/heads/master
Commit: a833af4195987ded716a5aa47b476271f47e4c85
Parents: 2993920
Author: vesense <be...@163.com>
Authored: Thu Oct 27 15:53:16 2016 +0800
Committer: vesense <be...@163.com>
Committed: Thu Oct 27 15:53:16 2016 +0800
----------------------------------------------------------------------
.../storm/mongodb/topology/LookupWordCount.java | 80 ++++++++
.../mongodb/topology/TotalWordCounter.java | 69 +++++++
.../storm/mongodb/topology/UpdateWordCount.java | 16 +-
.../storm/mongodb/trident/PrintFunction.java | 40 ++++
.../storm/mongodb/trident/WordCountTrident.java | 9 +-
.../mongodb/trident/WordCountTridentMap.java | 95 ++++++++++
external/storm-mongodb/README.md | 185 ++++++++++++++++---
7 files changed, 456 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
new file mode 100644
index 0000000..5140685
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.bolt.MongoLookupBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+public class LookupWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+ private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+ private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String url = TEST_MONGODB_URL;
+ String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+ if (args.length >= 2) {
+ url = args[0];
+ collectionName = args[1];
+ }
+
+ WordSpout spout = new WordSpout();
+ TotalWordCounter totalBolt = new TotalWordCounter();
+
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+
+ //wordspout -> lookupbolt -> totalCountBolt
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
new file mode 100644
index 0000000..137aac4
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+ private BigInteger total = BigInteger.ZERO;
+ private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+ private static final Random RANDOM = new Random();
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ /*
+ * Just output the word value with a count of 1.
+ */
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ total = total.add(new BigInteger(input.getValues().get(1).toString()));
+ collector.emit(tuple(total.toString()));
+ //prints the total with low probability.
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info("Running total = " + total);
+ }
+ }
+
+ public void cleanup() {
+ LOG.info("Final total = " + total);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("total"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index 7895f35..daca619 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -20,19 +20,14 @@ package org.apache.storm.mongodb.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
import org.apache.storm.mongodb.common.QueryFilterCreator;
import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
-import java.util.HashMap;
-import java.util.Map;
-
public class UpdateWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
@@ -56,17 +51,20 @@ public class UpdateWordCount {
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
- QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
.withField("word");
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, filterCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
+
// wordSpout ==> countBolt ==> MongoUpdateBolt
TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
new file mode 100644
index 0000000..3db91f9
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info(tuple.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index 44447be..fe1bb03 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -25,6 +25,7 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateQuery;
import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
@@ -59,7 +60,13 @@ public class WordCountTrident {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
return topology.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
new file mode 100644
index 0000000..83c0caf
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTridentMap {
+
+ public static StormTopology buildTopology(String url, String collectionName){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ System.exit(0);
+ }
+ else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+ } else{
+ System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a833af41/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index 614b52f..00b8e4b 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,6 +1,6 @@
#Storm MongoDB
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
## Insert into Database
The bolt and trident state included in this package for inserting data into a database collection.
@@ -11,6 +11,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
```java
public interface MongoMapper extends Serializable {
Document toDocument(ITuple tuple);
+ Document toDocumentByKeys(List<Object> keys);
}
```
@@ -30,6 +31,13 @@ public class SimpleMongoMapper implements MongoMapper {
return document;
}
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
@@ -53,38 +61,25 @@ MongoMapper mapper = new SimpleMongoMapper()
MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
```
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
## Update from Database
The bolt included in this package for updating data from a database collection.
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
https://docs.mongodb.org/manual/reference/operator/update/
```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
private String[] fields;
@Override
@@ -93,6 +88,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
+ //$set operator: Sets the value of a field in a document.
return new Document("$set", document);
}
@@ -104,13 +100,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
```
-
### QueryFilterCreator
The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
```java
public interface QueryFilterCreator extends Serializable {
Bson createFilter(ITuple tuple);
+ Bson createFilterByKeys(List<Object> keys);
}
```
@@ -120,6 +116,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
```java
public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
private String field;
@Override
@@ -127,6 +124,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
return Filters.eq(field, tuple.getValueByField(field));
}
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
@@ -136,10 +138,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
```
### MongoUpdateBolt
-To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -149,12 +151,15 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
+
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
```
Or use a anonymous inner class implementation for `QueryFilterCreator`:
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -170,6 +175,130 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//updateBolt.withUpsert(true);
```
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+ List<Values> toTuple(ITuple input, Document doc);
+
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+
+
## License
Licensed to the Apache Software Foundation (ASF) under one