You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2016/12/16 05:27:12 UTC
[1/3] storm git commit: STORM-1607: Add MongoMapState for supporting
trident's exactly once semantics [Forced Update!]
Repository: storm
Updated Branches:
refs/heads/master 68d4016e7 -> f63078c90 (forced update)
STORM-1607: Add MongoMapState for supporting trident's exactly once semantics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/021c9336
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/021c9336
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/021c9336
Branch: refs/heads/master
Commit: 021c9336ee1bdaefa658fd0b5e9255fde2aef2c9
Parents: f052669
Author: vesense <be...@163.com>
Authored: Fri Oct 21 13:52:27 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:19:55 2016 +0800
----------------------------------------------------------------------
.../storm/mongodb/bolt/MongoLookupBolt.java | 76 +++++++
.../storm/mongodb/bolt/MongoUpdateBolt.java | 18 +-
.../storm/mongodb/common/MongoDBClient.java | 26 ++-
.../apache/storm/mongodb/common/MongoUtils.java | 38 ++++
.../mongodb/common/QueryFilterCreator.java | 10 +-
.../common/SimpleQueryFilterCreator.java | 7 +
.../common/mapper/MongoLookupMapper.java | 44 +++++
.../mongodb/common/mapper/MongoMapper.java | 10 +-
.../common/mapper/MongoUpdateMapper.java | 31 +++
.../common/mapper/SimpleMongoLookupMapper.java | 64 ++++++
.../common/mapper/SimpleMongoMapper.java | 14 ++
.../common/mapper/SimpleMongoUpdateMapper.java | 8 +-
.../mongodb/trident/state/MongoMapState.java | 196 +++++++++++++++++++
.../storm/mongodb/trident/state/MongoState.java | 41 +++-
.../mongodb/trident/state/MongoStateQuery.java | 40 ++++
15 files changed, 607 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
new file mode 100644
index 0000000..909826d
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from MongoDB.
+ *
+ * Note: Each MongoLookupBolt defined in a topology is tied to a specific collection.
+ *
+ */
+public class MongoLookupBolt extends AbstractMongoBolt {
+
+ private QueryFilterCreator queryCreator;
+ private MongoLookupMapper mapper;
+
+ public MongoLookupBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoLookupMapper mapper) {
+ super(url, collectionName);
+
+ Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+ Validate.notNull(mapper, "MongoLookupMapper can not be null");
+
+ this.queryCreator = queryCreator;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try{
+ //get query filter
+ Bson filter = queryCreator.createFilter(tuple);
+ //find document from mongodb
+ Document doc = mongoClient.find(filter);
+ //get storm values and emit
+ List<Values> valuesList = mapper.toTuple(tuple, doc);
+ for (Values values : valuesList) {
+ this.collector.emit(tuple, values);
+ }
+ this.collector.ack(tuple);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ mapper.declareOutputFields(declarer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 510a3d0..9fb6c56 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -19,7 +19,7 @@ package org.apache.storm.mongodb.bolt;
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.bson.Document;
@@ -34,15 +34,16 @@ import org.bson.conversions.Bson;
public class MongoUpdateBolt extends AbstractMongoBolt {
private QueryFilterCreator queryCreator;
- private MongoMapper mapper;
+ private MongoUpdateMapper mapper;
- private boolean upsert; //The default is false.
+ private boolean upsert; //the default is false.
+ private boolean many; //the default is false.
- public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoMapper mapper) {
+ public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
super(url, collectionName);
Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
- Validate.notNull(mapper, "MongoMapper can not be null");
+ Validate.notNull(mapper, "MongoUpdateMapper can not be null");
this.queryCreator = queryCreator;
this.mapper = mapper;
@@ -55,7 +56,7 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
Document doc = mapper.toDocument(tuple);
//get query filter
Bson filter = queryCreator.createFilter(tuple);
- mongoClient.update(filter, doc, upsert);
+ mongoClient.update(filter, doc, upsert, many);
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
@@ -68,6 +69,11 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
return this;
}
+ public MongoUpdateBolt withMany(boolean many) {
+ this.many = many;
+ return this;
+ }
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
index cb4c454..f86b969 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -62,19 +62,35 @@ public class MongoDBClient {
}
/**
- * Update all documents in the collection according to the specified query filter.
+ * Update a single or all documents in the collection according to the specified arguments.
* When upsert set to true, the new document will be inserted if there are no matches to the query filter.
*
* @param filter
- * @param update
- * @param upsert
+ * @param document
+ * @param upsert a new document should be inserted if there are no matches to the query filter
+ * @param many whether find all documents according to the query filter
*/
- public void update(Bson filter, Bson update, boolean upsert) {
+ public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+ //TODO batch updating
UpdateOptions options = new UpdateOptions();
if (upsert) {
options.upsert(true);
}
- collection.updateMany(filter, update, options);
+ if (many) {
+ collection.updateMany(filter, document, options);
+ }else {
+ collection.updateOne(filter, document, options);
+ }
+ }
+
+ /**
+ * Finds a single document in the collection according to the specified arguments.
+ *
+ * @param filter
+ */
+ public Document find(Bson filter) {
+ //TODO batch finding
+ return collection.find(filter).first();
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
new file mode 100644
index 0000000..7c62beb
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public final class MongoUtils {
+
+ public static byte[] getID(List<Object> keys) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ for (Object key : keys) {
+ bos.write(String.valueOf(key).getBytes());
+ }
+ bos.close();
+ } catch (IOException e){
+ throw new RuntimeException("IOException creating Mongo document _id.", e);
+ }
+ return bos.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index d95f717..4ef0a02 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -18,12 +18,13 @@
package org.apache.storm.mongodb.common;
import java.io.Serializable;
+import java.util.List;
import org.apache.storm.tuple.ITuple;
import org.bson.conversions.Bson;
/**
- * Create a MongoDB query Filter by given Tuple.
+ * Create a MongoDB query Filter by given Tuple/trident keys.
*/
public interface QueryFilterCreator extends Serializable {
@@ -35,4 +36,11 @@ public interface QueryFilterCreator extends Serializable {
*/
Bson createFilter(ITuple tuple);
+ /**
+ * Create a query Filter by given trident keys
+ *
+ * @param keys
+ * @return query Filter
+ */
+ Bson createFilterByKeys(List<Object> keys);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 8b4f1c3..5bfe58a 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -22,6 +22,8 @@ import org.bson.conversions.Bson;
import com.mongodb.client.model.Filters;
+import java.util.List;
+
public class SimpleQueryFilterCreator implements QueryFilterCreator {
private String field;
@@ -31,6 +33,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
return Filters.eq(field, tuple.getValueByField(field));
}
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
new file mode 100644
index 0000000..48347bb
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface MongoLookupMapper extends Serializable {
+
+ /**
+ * Converts a Mongo document to a list of storm values that can be emitted. This is done to allow a single
+ * storm input tuple and a single Mongo document to result in multiple output values.
+ * @param input the input tuple.
+ * @param doc the mongo document
+ * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+ */
+ List<Values> toTuple(ITuple input, Document doc);
+
+ /**
+ * declare what are the fields that this code will output.
+ * @param declarer
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 7bcd499..9b55661 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -18,12 +18,13 @@
package org.apache.storm.mongodb.common.mapper;
import java.io.Serializable;
+import java.util.List;
import org.apache.storm.tuple.ITuple;
import org.bson.Document;
/**
- * Given a Tuple, converts it to an MongoDB document.
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
*/
public interface MongoMapper extends Serializable {
@@ -35,4 +36,11 @@ public interface MongoMapper extends Serializable {
*/
Document toDocument(ITuple tuple);
+ /**
+ * Converts a keys to a Document
+ *
+ * @param keys the trident keys
+ * @return the MongoDB document
+ */
+ Document toDocumentByKeys(List<Object> keys);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
new file mode 100644
index 0000000..af9a4ca
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * MongoUpdateMapper is for defining spec. which is used for converting Tuple/ trident keys to an MongoDB document.
+ */
+public interface MongoUpdateMapper extends MongoMapper {
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
new file mode 100644
index 0000000..d4bc19c
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ public SimpleMongoLookupMapper(String... fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 4440962..00f240d 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -17,13 +17,20 @@
*/
package org.apache.storm.mongodb.common.mapper;
+import org.apache.storm.mongodb.common.MongoUtils;
import org.apache.storm.tuple.ITuple;
import org.bson.Document;
+import java.util.List;
+
public class SimpleMongoMapper implements MongoMapper {
private String[] fields;
+ public SimpleMongoMapper(String... fields) {
+ this.fields = fields;
+ }
+
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
@@ -33,6 +40,13 @@ public class SimpleMongoMapper implements MongoMapper {
return document;
}
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index f07d4dc..d13fc21 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -20,10 +20,16 @@ package org.apache.storm.mongodb.common.mapper;
import org.apache.storm.tuple.ITuple;
import org.bson.Document;
-public class SimpleMongoUpdateMapper implements MongoMapper {
+import java.util.List;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
private String[] fields;
+ public SimpleMongoUpdateMapper(String... fields) {
+ this.fields = fields;
+ }
+
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
new file mode 100644
index 0000000..878d002
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.*;
+import org.apache.storm.trident.state.map.*;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MongoMapState<T> implements IBackingMap<T> {
+ private static Logger LOG = LoggerFactory.getLogger(MongoMapState.class);
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
+
+ static {
+ DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
+ DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
+ DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
+ }
+
+ private Options<T> options;
+ private Serializer<T> serializer;
+ private MongoDBClient mongoClient;
+ private Map map;
+
+ protected MongoMapState(Map map, Options options) {
+ this.options = options;
+ this.map = map;
+ this.serializer = options.serializer;
+
+ Validate.notEmpty(options.url, "url can not be blank or null");
+ Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+ Validate.notNull(options.queryCreator, "queryCreator can not be null");
+ Validate.notNull(options.mapper, "mapper can not be null");
+
+ this.mongoClient = new MongoDBClient(options.url, options.collectionName);
+ }
+
+ public static class Options<T> implements Serializable {
+ public String url;
+ public String collectionName;
+ public MongoMapper mapper;
+ public QueryFilterCreator queryCreator;
+ public Serializer<T> serializer;
+ public int cacheSize = 5000;
+ public String globalKey = "$MONGO-MAP-STATE-GLOBAL";
+ public String serDocumentField = "tridentSerField";
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory opaque() {
+ Options<OpaqueValue> options = new Options<OpaqueValue>();
+ return opaque(options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory opaque(Options<OpaqueValue> opts) {
+
+ return new Factory(StateType.OPAQUE, opts);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory transactional() {
+ Options<TransactionalValue> options = new Options<TransactionalValue>();
+ return transactional(options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory transactional(Options<TransactionalValue> opts) {
+ return new Factory(StateType.TRANSACTIONAL, opts);
+ }
+
+ public static StateFactory nonTransactional() {
+ Options<Object> options = new Options<Object>();
+ return nonTransactional(options);
+ }
+
+ public static StateFactory nonTransactional(Options<Object> opts) {
+ return new Factory(StateType.NON_TRANSACTIONAL, opts);
+ }
+
+
+ protected static class Factory implements StateFactory {
+ private StateType stateType;
+ private Options options;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Factory(StateType stateType, Options options) {
+ this.stateType = stateType;
+ this.options = options;
+
+ if (this.options.serializer == null) {
+ this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
+ }
+
+ if (this.options.serializer == null) {
+ throw new RuntimeException("Serializer should be specified for type: " + stateType);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ IBackingMap state = new MongoMapState(conf, options);
+
+ if(options.cacheSize > 0) {
+ state = new CachedMap(state, options.cacheSize);
+ }
+
+ MapState mapState;
+ switch (stateType) {
+ case NON_TRANSACTIONAL:
+ mapState = NonTransactionalMap.build(state);
+ break;
+ case OPAQUE:
+ mapState = OpaqueMap.build(state);
+ break;
+ case TRANSACTIONAL:
+ mapState = TransactionalMap.build(state);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown state type: " + stateType);
+ }
+ return new SnapshottableMap(mapState, new Values(options.globalKey));
+ }
+
+ }
+
+ @Override
+ public List<T> multiGet(List<List<Object>> keysList) {
+ List<T> retval = new ArrayList<>();
+ try {
+ for (List<Object> keys : keysList) {
+ Bson filter = options.queryCreator.createFilterByKeys(keys);
+ Document doc = mongoClient.find(filter);
+ if(doc != null) {
+ retval.add(this.serializer.deserialize((byte[])doc.get(options.serDocumentField)));
+ } else {
+ retval.add(null);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch get operation failed.", e);
+ throw new FailedException(e);
+ }
+ return retval;
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keysList, List<T> values) {
+ try {
+ for (int i = 0; i < keysList.size(); i++) {
+ List<Object> keys = keysList.get(i);
+ T value = values.get(i);
+ Bson filter = options.queryCreator.createFilterByKeys(keys);
+ Document document = options.mapper.toDocumentByKeys(keys);
+ document.append(options.serDocumentField, this.serializer.serialize(value));
+ this.mongoClient.update(filter, document, true, false);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch write operation failed.", e);
+ throw new FailedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 112b170..06485e3 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -23,11 +23,16 @@ import java.util.Map;
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
import org.bson.Document;
+import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +55,8 @@ public class MongoState implements State {
private String url;
private String collectionName;
private MongoMapper mapper;
+ private MongoLookupMapper lookupMapper;
+ private QueryFilterCreator queryCreator;
public Options withUrl(String url) {
this.url = url;
@@ -65,12 +72,21 @@ public class MongoState implements State {
this.mapper = mapper;
return this;
}
+
+ public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+ return this;
+ }
+
+ public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+ this.queryCreator = queryCreator;
+ return this;
+ }
}
protected void prepare() {
Validate.notEmpty(options.url, "url can not be blank or null");
Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
- Validate.notNull(options.mapper, "MongoMapper can not be null");
this.mongoClient = new MongoDBClient(options.url, options.collectionName);
}
@@ -91,7 +107,28 @@ public class MongoState implements State {
Document document = options.mapper.toDocument(tuple);
documents.add(document);
}
- this.mongoClient.insert(documents, true);
+
+ try {
+ this.mongoClient.insert(documents, true);
+ } catch (Exception e) {
+ LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ throw new FailedException(e);
+ }
}
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+ try {
+ for (TridentTuple tuple : tridentTuples) {
+ Bson filter = options.queryCreator.createFilter(tuple);
+ Document doc = mongoClient.find(filter);
+ List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+ batchRetrieveResult.add(values);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch get operation failed. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
new file mode 100644
index 0000000..8dc43c5
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseQueryFunction;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class MongoStateQuery extends BaseQueryFunction<MongoState, List<Values>> {
+
+ @Override
+ public List<List<Values>> batchRetrieve(MongoState mongoState, List<TridentTuple> tridentTuples) {
+ return mongoState.batchRetrieve(tridentTuples);
+ }
+
+ @Override
+ public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
+ for (Values value : values) {
+ tridentCollector.emit(value);
+ }
+ }
+}
[2/3] storm git commit: update examples and documents
Posted by xi...@apache.org.
update examples and documents
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2dfd4401
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2dfd4401
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2dfd4401
Branch: refs/heads/master
Commit: 2dfd4401fc8c6a805abb38a674a5bcea14188b0a
Parents: 021c933
Author: vesense <be...@163.com>
Authored: Thu Oct 27 15:53:16 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:23:29 2016 +0800
----------------------------------------------------------------------
.../storm/mongodb/topology/LookupWordCount.java | 80 ++++++++
.../mongodb/topology/TotalWordCounter.java | 69 +++++++
.../storm/mongodb/topology/UpdateWordCount.java | 16 +-
.../storm/mongodb/trident/PrintFunction.java | 40 ++++
.../storm/mongodb/trident/WordCountTrident.java | 9 +-
.../mongodb/trident/WordCountTridentMap.java | 95 ++++++++++
.../sql/mongodb/MongoDataSourcesProvider.java | 5 +
external/storm-mongodb/README.md | 185 ++++++++++++++++---
8 files changed, 461 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
new file mode 100644
index 0000000..5140685
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.bolt.MongoLookupBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+public class LookupWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+ private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+ private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ String url = TEST_MONGODB_URL;
+ String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+ if (args.length >= 2) {
+ url = args[0];
+ collectionName = args[1];
+ }
+
+ WordSpout spout = new WordSpout();
+ TotalWordCounter totalBolt = new TotalWordCounter();
+
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+
+ //wordspout -> lookupbolt -> totalCountBolt
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
+
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else if (args.length == 3) {
+ StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+ } else{
+ System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
new file mode 100644
index 0000000..137aac4
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+ private BigInteger total = BigInteger.ZERO;
+ private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+ private static final Random RANDOM = new Random();
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ /*
+ * Just output the word value with a count of 1.
+ */
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ total = total.add(new BigInteger(input.getValues().get(1).toString()));
+ collector.emit(tuple(total.toString()));
+ //prints the total with low probability.
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info("Running total = " + total);
+ }
+ }
+
+ public void cleanup() {
+ LOG.info("Final total = " + total);
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("total"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index f80cc7c..b4af4ca 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -21,19 +21,14 @@ import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
import org.apache.storm.mongodb.common.QueryFilterCreator;
import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
-import java.util.HashMap;
-import java.util.Map;
-
public class UpdateWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
@@ -57,17 +52,20 @@ public class UpdateWordCount {
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
- QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
.withField("word");
- MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+ MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, filterCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
+
// wordSpout ==> countBolt ==> MongoUpdateBolt
TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
new file mode 100644
index 0000000..3db91f9
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info(tuple.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index ae97961..14dccbd 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -26,6 +26,7 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateQuery;
import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
@@ -60,7 +61,13 @@ public class WordCountTrident {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
return topology.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
new file mode 100644
index 0000000..83c0caf
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTridentMap {
+
+ public static StormTopology buildTopology(String url, String collectionName){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 2) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ System.exit(0);
+ }
+ else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+ } else{
+ System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
index 6eb014e..60d52d1 100644
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -97,6 +97,11 @@ public class MongoDataSourcesProvider implements DataSourcesProvider {
document.append(serField, array);
return document;
}
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ return null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index 614b52f..00b8e4b 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,6 +1,6 @@
#Storm MongoDB
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
## Insert into Database
The bolt and trident state included in this package for inserting data into a database collection.
@@ -11,6 +11,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
```java
public interface MongoMapper extends Serializable {
Document toDocument(ITuple tuple);
+ Document toDocumentByKeys(List<Object> keys);
}
```
@@ -30,6 +31,13 @@ public class SimpleMongoMapper implements MongoMapper {
return document;
}
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ Document document = new Document();
+ document.append("_id", MongoUtils.getID(keys));
+ return document;
+ }
+
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
@@ -53,38 +61,25 @@ MongoMapper mapper = new SimpleMongoMapper()
MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
```
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
- MongoMapper mapper = new SimpleMongoMapper()
- .withFields("word", "count");
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(collectionName)
- .withMapper(mapper);
-
- StateFactory factory = new MongoStateFactory(options);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("spout1", spout);
-
- stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
## Update from Database
The bolt included in this package for updating data from a database collection.
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit
https://docs.mongodb.org/manual/reference/operator/update/
```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
private String[] fields;
@Override
@@ -93,6 +88,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
+ //$set operator: Sets the value of a field in a document.
return new Document("$set", document);
}
@@ -104,13 +100,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
```
-
### QueryFilterCreator
The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
```java
public interface QueryFilterCreator extends Serializable {
Bson createFilter(ITuple tuple);
+ Bson createFilterByKeys(List<Object> keys);
}
```
@@ -120,6 +116,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
```java
public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
private String field;
@Override
@@ -127,6 +124,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
return Filters.eq(field, tuple.getValueByField(field));
}
+ @Override
+ public Bson createFilterByKeys(List<Object> keys) {
+ return Filters.eq("_id", MongoUtils.getID(keys));
+ }
+
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
@@ -136,10 +138,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
```
### MongoUpdateBolt
-To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -149,12 +151,15 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
+
+ //whether find all documents according to the query filter
+ //updateBolt.withMany(true);
```
Or use a anonymous inner class implementation for `QueryFilterCreator`:
```java
- MongoMapper mapper = new SimpleMongoUpdateMapper()
+ MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -170,6 +175,130 @@ To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mon
//updateBolt.withUpsert(true);
```
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+ List<Values> toTuple(ITuple input, Document doc);
+
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+ private String[] fields;
+
+ @Override
+ public List<Values> toTuple(ITuple input, Document doc) {
+ Values values = new Values();
+
+ for(String field : fields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ values.add(doc.get(field));
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
+
+ public SimpleMongoLookupMapper withFields(String... fields) {
+ this.fields = fields;
+ return this;
+ }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+ MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(collectionName)
+ .withMapper(mapper);
+
+ StateFactory factory = new MongoStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new MongoStateUpdater(), new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new MongoStateQuery(), new Fields("columnName", "columnValue"));
+ stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+ MongoMapper mapper = new SimpleMongoMapper()
+ .withFields("word", "count");
+
+ QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+ .withField("word");
+
+ MongoMapState.Options options = new MongoMapState.Options();
+ options.url = url;
+ options.collectionName = collectionName;
+ options.mapper = mapper;
+ options.queryCreator = filterCreator;
+
+ StateFactory factory = MongoMapState.transactional(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+
+
## License
Licensed to the Apache Software Foundation (ASF) under one
[3/3] storm git commit: Add STORM-1607 to Changelog
Posted by xi...@apache.org.
Add STORM-1607 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f63078c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f63078c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f63078c9
Branch: refs/heads/master
Commit: f63078c90d0671205ecef873d00efd35dd90d98d
Parents: 2dfd440
Author: vesense <be...@163.com>
Authored: Fri Dec 16 11:45:58 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:24:21 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f63078c9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index db1a01d..5aa28a9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1607: Add MongoMapState for supporting trident's exactly once semantics
* STORM-2204: Adding caching capabilities in HBaseLookupBolt
* STORM-2104: More graceful handling of acked/failed tuples after partition reassignment
* STORM-1281: LocalCluster, testing4j and testing.clj to java