You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2016/12/16 05:27:12 UTC

[1/3] storm git commit: STORM-1607: Add MongoMapState for supporting trident's exactly once semantics [Forced Update!]

Repository: storm
Updated Branches:
  refs/heads/master 68d4016e7 -> f63078c90 (forced update)


STORM-1607: Add MongoMapState for supporting trident's exactly once semantics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/021c9336
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/021c9336
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/021c9336

Branch: refs/heads/master
Commit: 021c9336ee1bdaefa658fd0b5e9255fde2aef2c9
Parents: f052669
Author: vesense <be...@163.com>
Authored: Fri Oct 21 13:52:27 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:19:55 2016 +0800

----------------------------------------------------------------------
 .../storm/mongodb/bolt/MongoLookupBolt.java     |  76 +++++++
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  18 +-
 .../storm/mongodb/common/MongoDBClient.java     |  26 ++-
 .../apache/storm/mongodb/common/MongoUtils.java |  38 ++++
 .../mongodb/common/QueryFilterCreator.java      |  10 +-
 .../common/SimpleQueryFilterCreator.java        |   7 +
 .../common/mapper/MongoLookupMapper.java        |  44 +++++
 .../mongodb/common/mapper/MongoMapper.java      |  10 +-
 .../common/mapper/MongoUpdateMapper.java        |  31 +++
 .../common/mapper/SimpleMongoLookupMapper.java  |  64 ++++++
 .../common/mapper/SimpleMongoMapper.java        |  14 ++
 .../common/mapper/SimpleMongoUpdateMapper.java  |   8 +-
 .../mongodb/trident/state/MongoMapState.java    | 196 +++++++++++++++++++
 .../storm/mongodb/trident/state/MongoState.java |  41 +++-
 .../mongodb/trident/state/MongoStateQuery.java  |  40 ++++
 15 files changed, 607 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
new file mode 100644
index 0000000..909826d
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from MongoDB.
+ *
+ * Note: Each MongoLookupBolt defined in a topology is tied to a specific collection.
+ *
+ */
+public class MongoLookupBolt extends AbstractMongoBolt {
+
+    private QueryFilterCreator queryCreator;
+    private MongoLookupMapper mapper;
+
+    public MongoLookupBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoLookupMapper mapper) {
+        super(url, collectionName);
+
+        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
+        Validate.notNull(mapper, "MongoLookupMapper can not be null");
+
+        this.queryCreator = queryCreator;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try{
+            //get query filter
+            Bson filter = queryCreator.createFilter(tuple);
+            //find document from mongodb
+            Document doc = mongoClient.find(filter);
+            //get storm values and emit
+            List<Values> valuesList = mapper.toTuple(tuple, doc);
+            for (Values values : valuesList) {
+                this.collector.emit(tuple, values);
+            }
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        mapper.declareOutputFields(declarer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 510a3d0..9fb6c56 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -19,7 +19,7 @@ package org.apache.storm.mongodb.bolt;
 
 import org.apache.commons.lang.Validate;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.bson.Document;
@@ -34,15 +34,16 @@ import org.bson.conversions.Bson;
 public class MongoUpdateBolt extends AbstractMongoBolt {
 
     private QueryFilterCreator queryCreator;
-    private MongoMapper mapper;
+    private MongoUpdateMapper mapper;
 
-    private boolean upsert;  //The default is false.
+    private boolean upsert;  //the default is false.
+    private boolean many;  //the default is false.
 
-    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoMapper mapper) {
+    public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
         super(url, collectionName);
 
         Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
-        Validate.notNull(mapper, "MongoMapper can not be null");
+        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
 
         this.queryCreator = queryCreator;
         this.mapper = mapper;
@@ -55,7 +56,7 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
             Document doc = mapper.toDocument(tuple);
             //get query filter
             Bson filter = queryCreator.createFilter(tuple);
-            mongoClient.update(filter, doc, upsert);
+            mongoClient.update(filter, doc, upsert, many);
             this.collector.ack(tuple);
         } catch (Exception e) {
             this.collector.reportError(e);
@@ -68,6 +69,11 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
         return this;
     }
 
+    public MongoUpdateBolt withMany(boolean many) {
+        this.many = many;
+        return this;
+    }
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
index cb4c454..f86b969 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -62,19 +62,35 @@ public class MongoDBClient {
     }
 
     /**
-     * Update all documents in the collection according to the specified query filter.
+     * Update a single or all documents in the collection according to the specified arguments.
      * When upsert set to true, the new document will be inserted if there are no matches to the query filter.
      * 
      * @param filter
-     * @param update
-     * @param upsert
+     * @param document
+     * @param upsert a new document should be inserted if there are no matches to the query filter
+     * @param many whether find all documents according to the query filter
      */
-    public void update(Bson filter, Bson update, boolean upsert) {
+    public void update(Bson filter, Bson document, boolean upsert, boolean many) {
+        //TODO batch updating
         UpdateOptions options = new UpdateOptions();
         if (upsert) {
             options.upsert(true);
         }
-        collection.updateMany(filter, update, options);
+        if (many) {
+            collection.updateMany(filter, document, options);
+        }else {
+            collection.updateOne(filter, document, options);
+        }
+    }
+
+    /**
+     * Finds a single document in the collection according to the specified arguments.
+     *
+     * @param filter
+     */
+    public Document find(Bson filter) {
+        //TODO batch finding
+        return collection.find(filter).first();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
new file mode 100644
index 0000000..7c62beb
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public final class MongoUtils {
+
+    public static byte[] getID(List<Object> keys) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            for (Object key : keys) {
+                bos.write(String.valueOf(key).getBytes());
+            }
+            bos.close();
+        } catch (IOException e){
+            throw new RuntimeException("IOException creating Mongo document _id.", e);
+        }
+        return bos.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
index d95f717..4ef0a02 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
@@ -18,12 +18,13 @@
 package org.apache.storm.mongodb.common;
 
 import java.io.Serializable;
+import java.util.List;
 
 import org.apache.storm.tuple.ITuple;
 import org.bson.conversions.Bson;
 
 /**
- * Create a MongoDB query Filter by given Tuple.
+ * Create a MongoDB query Filter by given Tuple/trident keys.
  */
 public interface QueryFilterCreator extends Serializable {
 
@@ -35,4 +36,11 @@ public interface QueryFilterCreator extends Serializable {
      */
     Bson createFilter(ITuple tuple);
 
+    /**
+     * Create a query Filter by given trident keys
+     *
+     * @param keys
+     * @return query Filter
+     */
+    Bson createFilterByKeys(List<Object> keys);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
index 8b4f1c3..5bfe58a 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
@@ -22,6 +22,8 @@ import org.bson.conversions.Bson;
 
 import com.mongodb.client.model.Filters;
 
+import java.util.List;
+
 public class SimpleQueryFilterCreator implements QueryFilterCreator {
 
     private String field;
@@ -31,6 +33,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
         return Filters.eq(field, tuple.getValueByField(field));
     }
 
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
     public SimpleQueryFilterCreator withField(String field) {
         this.field = field;
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
new file mode 100644
index 0000000..48347bb
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface MongoLookupMapper extends Serializable {
+
+    /**
+     * Converts a Mongo document to a list of storm values that can be emitted. This is done to allow a single
+     * storm input tuple and a single Mongo document to result in multiple output values.
+     * @param input the input tuple.
+     * @param doc the mongo document
+     * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+     */
+    List<Values> toTuple(ITuple input, Document doc);
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
index 7bcd499..9b55661 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
@@ -18,12 +18,13 @@
 package org.apache.storm.mongodb.common.mapper;
 
 import java.io.Serializable;
+import java.util.List;
 
 import org.apache.storm.tuple.ITuple;
 import org.bson.Document;
 
 /**
- * Given a Tuple, converts it to an MongoDB document.
+ * Given a Tuple/trident keys, converts it to an MongoDB document.
  */
 public interface MongoMapper extends Serializable {
 
@@ -35,4 +36,11 @@ public interface MongoMapper extends Serializable {
      */
     Document toDocument(ITuple tuple);
 
+    /**
+     * Converts a keys to a Document
+     *
+     * @param keys the trident keys
+     * @return the MongoDB document
+     */
+    Document toDocumentByKeys(List<Object> keys);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
new file mode 100644
index 0000000..af9a4ca
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * MongoUpdateMapper is for defining spec. which is used for converting Tuple/ trident keys to an MongoDB document.
+ */
+public interface MongoUpdateMapper extends MongoMapper {
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
new file mode 100644
index 0000000..d4bc19c
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.common.mapper;
+
+import org.apache.storm.mongodb.common.MongoUtils;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    public SimpleMongoLookupMapper(String... fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
index 4440962..00f240d 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
@@ -17,13 +17,20 @@
  */
 package org.apache.storm.mongodb.common.mapper;
 
+import org.apache.storm.mongodb.common.MongoUtils;
 import org.apache.storm.tuple.ITuple;
 import org.bson.Document;
 
+import java.util.List;
+
 public class SimpleMongoMapper implements MongoMapper {
 
     private String[] fields;
 
+    public SimpleMongoMapper(String... fields) {
+        this.fields = fields;
+    }
+
     @Override
     public Document toDocument(ITuple tuple) {
         Document document = new Document();
@@ -33,6 +40,13 @@ public class SimpleMongoMapper implements MongoMapper {
         return document;
     }
 
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
     public SimpleMongoMapper withFields(String... fields) {
         this.fields = fields;
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index f07d4dc..d13fc21 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -20,10 +20,16 @@ package org.apache.storm.mongodb.common.mapper;
 import org.apache.storm.tuple.ITuple;
 import org.bson.Document;
 
-public class SimpleMongoUpdateMapper implements MongoMapper {
+import java.util.List;
+
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
 
     private String[] fields;
 
+    public SimpleMongoUpdateMapper(String... fields) {
+        this.fields = fields;
+    }
+
     @Override
     public Document toDocument(ITuple tuple) {
         Document document = new Document();

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
new file mode 100644
index 0000000..878d002
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.*;
+import org.apache.storm.trident.state.map.*;
+import org.apache.storm.tuple.Values;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MongoMapState<T> implements IBackingMap<T> {
+    private static Logger LOG = LoggerFactory.getLogger(MongoMapState.class);
+
+    @SuppressWarnings("rawtypes")
+    private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
+
+    static {
+        DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
+        DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
+        DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
+    }
+
+    private Options<T> options;
+    private Serializer<T> serializer;
+    private MongoDBClient mongoClient;
+    private Map map;
+
+    protected MongoMapState(Map map, Options options) {
+        this.options = options;
+        this.map = map;
+        this.serializer = options.serializer;
+
+        Validate.notEmpty(options.url, "url can not be blank or null");
+        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
+        Validate.notNull(options.queryCreator, "queryCreator can not be null");
+        Validate.notNull(options.mapper, "mapper can not be null");
+
+        this.mongoClient = new MongoDBClient(options.url, options.collectionName);
+    }
+
+    public static class Options<T> implements Serializable {
+        public String url;
+        public String collectionName;
+        public MongoMapper mapper;
+        public QueryFilterCreator queryCreator;
+        public Serializer<T> serializer;
+        public int cacheSize = 5000;
+        public String globalKey = "$MONGO-MAP-STATE-GLOBAL";
+        public String serDocumentField = "tridentSerField";
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory opaque() {
+        Options<OpaqueValue> options = new Options<OpaqueValue>();
+        return opaque(options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory opaque(Options<OpaqueValue> opts) {
+
+        return new Factory(StateType.OPAQUE, opts);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory transactional() {
+        Options<TransactionalValue> options = new Options<TransactionalValue>();
+        return transactional(options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory transactional(Options<TransactionalValue> opts) {
+        return new Factory(StateType.TRANSACTIONAL, opts);
+    }
+
+    public static StateFactory nonTransactional() {
+        Options<Object> options = new Options<Object>();
+        return nonTransactional(options);
+    }
+
+    public static StateFactory nonTransactional(Options<Object> opts) {
+        return new Factory(StateType.NON_TRANSACTIONAL, opts);
+    }
+
+
+    protected static class Factory implements StateFactory {
+        private StateType stateType;
+        private Options options;
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        public Factory(StateType stateType, Options options) {
+            this.stateType = stateType;
+            this.options = options;
+
+            if (this.options.serializer == null) {
+                this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
+            }
+
+            if (this.options.serializer == null) {
+                throw new RuntimeException("Serializer should be specified for type: " + stateType);
+            }
+        }
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+            IBackingMap state = new MongoMapState(conf, options);
+
+            if(options.cacheSize > 0) {
+                state = new CachedMap(state, options.cacheSize);
+            }
+
+            MapState mapState;
+            switch (stateType) {
+                case NON_TRANSACTIONAL:
+                    mapState = NonTransactionalMap.build(state);
+                    break;
+                case OPAQUE:
+                    mapState = OpaqueMap.build(state);
+                    break;
+                case TRANSACTIONAL:
+                    mapState = TransactionalMap.build(state);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown state type: " + stateType);
+            }
+            return new SnapshottableMap(mapState, new Values(options.globalKey));
+        }
+
+    }
+
+    @Override
+    public List<T> multiGet(List<List<Object>> keysList) {
+        List<T> retval = new ArrayList<>();
+        try {
+            for (List<Object> keys : keysList) {
+                Bson filter = options.queryCreator.createFilterByKeys(keys);
+                Document doc = mongoClient.find(filter);
+                if(doc != null) {
+                    retval.add(this.serializer.deserialize((byte[])doc.get(options.serDocumentField)));
+                } else {
+                    retval.add(null);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed.", e);
+            throw new FailedException(e);
+        }
+        return retval;
+    }
+
+    @Override
+    public void multiPut(List<List<Object>> keysList, List<T> values) {
+        try {
+            for (int i = 0; i < keysList.size(); i++) {
+                List<Object> keys = keysList.get(i);
+                T value = values.get(i);
+                Bson filter = options.queryCreator.createFilterByKeys(keys);
+                Document document = options.mapper.toDocumentByKeys(keys);
+                document.append(options.serDocumentField, this.serializer.serialize(value));
+                this.mongoClient.update(filter, document, true, false);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch write operation failed.", e);
+            throw new FailedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 112b170..06485e3 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -23,11 +23,16 @@ import java.util.Map;
 
 import org.apache.commons.lang.Validate;
 import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
 import org.bson.Document;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +55,8 @@ public class MongoState implements State {
         private String url;
         private String collectionName;
         private MongoMapper mapper;
+        private MongoLookupMapper lookupMapper;
+        private QueryFilterCreator queryCreator;
 
         public Options withUrl(String url) {
             this.url = url;
@@ -65,12 +72,21 @@ public class MongoState implements State {
             this.mapper = mapper;
             return this;
         }
+
+        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
+            this.lookupMapper = lookupMapper;
+            return this;
+        }
+
+        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) {
+            this.queryCreator = queryCreator;
+            return this;
+        }
     }
 
     protected void prepare() {
         Validate.notEmpty(options.url, "url can not be blank or null");
         Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
-        Validate.notNull(options.mapper, "MongoMapper can not be null");
 
         this.mongoClient = new MongoDBClient(options.url, options.collectionName);
     }
@@ -91,7 +107,28 @@ public class MongoState implements State {
             Document document = options.mapper.toDocument(tuple);
             documents.add(document);
         }
-        this.mongoClient.insert(documents, true);
+
+        try {
+            this.mongoClient.insert(documents, true);
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
     }
 
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+        try {
+            for (TridentTuple tuple : tridentTuples) {
+                Bson filter = options.queryCreator.createFilter(tuple);
+                Document doc = mongoClient.find(filter);
+                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
+                batchRetrieveResult.add(values);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/021c9336/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
new file mode 100644
index 0000000..8dc43c5
--- /dev/null
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident.state;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseQueryFunction;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class MongoStateQuery extends BaseQueryFunction<MongoState, List<Values>> {
+
+    @Override
+    public List<List<Values>> batchRetrieve(MongoState mongoState, List<TridentTuple> tridentTuples) {
+        return mongoState.batchRetrieve(tridentTuples);
+    }
+
+    @Override
+    public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
+        for (Values value : values) {
+            tridentCollector.emit(value);
+        }
+    }
+}


[2/3] storm git commit: update examples and documents

Posted by xi...@apache.org.
update examples and documents


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2dfd4401
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2dfd4401
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2dfd4401

Branch: refs/heads/master
Commit: 2dfd4401fc8c6a805abb38a674a5bcea14188b0a
Parents: 021c933
Author: vesense <be...@163.com>
Authored: Thu Oct 27 15:53:16 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:23:29 2016 +0800

----------------------------------------------------------------------
 .../storm/mongodb/topology/LookupWordCount.java |  80 ++++++++
 .../mongodb/topology/TotalWordCounter.java      |  69 +++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  16 +-
 .../storm/mongodb/trident/PrintFunction.java    |  40 ++++
 .../storm/mongodb/trident/WordCountTrident.java |   9 +-
 .../mongodb/trident/WordCountTridentMap.java    |  95 ++++++++++
 .../sql/mongodb/MongoDataSourcesProvider.java   |   5 +
 external/storm-mongodb/README.md                | 185 ++++++++++++++++---
 8 files changed, 461 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
new file mode 100644
index 0000000..5140685
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.bolt.MongoLookupBolt;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+public class LookupWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
+
+    private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test";
+    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String url = TEST_MONGODB_URL;
+        String collectionName = TEST_MONGODB_COLLECTION_NAME;
+
+        if (args.length >= 2) {
+            url = args[0];
+            collectionName = args[1];
+        }
+
+        WordSpout spout = new WordSpout();
+        TotalWordCounter totalBolt = new TotalWordCounter();
+
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+
+        //wordspout -> lookupbolt -> totalCountBolt
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
new file mode 100644
index 0000000..137aac4
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.topology;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.storm.utils.Utils.tuple;
+
+public class TotalWordCounter implements IBasicBolt {
+
+    private BigInteger total = BigInteger.ZERO;
+    private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
+    private static final Random RANDOM = new Random();
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    /*
+     * Just output the word value with a count of 1.
+     */
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        total = total.add(new BigInteger(input.getValues().get(1).toString()));
+        collector.emit(tuple(total.toString()));
+        //prints the total with low probability.
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info("Running total = " + total);
+        }
+    }
+
+    public void cleanup() {
+        LOG.info("Final total = " + total);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("total"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index f80cc7c..b4af4ca 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -21,19 +21,14 @@ import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
 import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
 import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class UpdateWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String COUNT_BOLT = "COUNT_BOLT";
@@ -57,17 +52,20 @@ public class UpdateWordCount {
         WordSpout spout = new WordSpout();
         WordCounter bolt = new WordCounter();
 
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
-        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
                 .withField("word");
         
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper);
+        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, filterCreator, mapper);
 
         //if a new document should be inserted if there are no matches to the query filter
         //updateBolt.withUpsert(true);
 
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
+
         // wordSpout ==> countBolt ==> MongoUpdateBolt
         TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
new file mode 100644
index 0000000..3db91f9
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+    private static final Random RANDOM = new Random();
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info(tuple.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index ae97961..14dccbd 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -26,6 +26,7 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
 import org.apache.storm.mongodb.trident.state.MongoState;
 import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateQuery;
 import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
@@ -60,7 +61,13 @@ public class WordCountTrident {
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
-        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
         return topology.build();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
new file mode 100644
index 0000000..83c0caf
--- /dev/null
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mongodb.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.mongodb.common.QueryFilterCreator;
+import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
+import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class WordCountTridentMap {
+
+    public static StormTopology buildTopology(String url, String collectionName){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("wordCounter");
+            cluster.shutdown();
+            System.exit(0);
+        }
+        else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+        } else{
+            System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
index 6eb014e..60d52d1 100644
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -97,6 +97,11 @@ public class MongoDataSourcesProvider implements DataSourcesProvider {
       document.append(serField, array);
       return document;
     }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+      return null;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2dfd4401/external/storm-mongodb/README.md
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
index 614b52f..00b8e4b 100644
--- a/external/storm-mongodb/README.md
+++ b/external/storm-mongodb/README.md
@@ -1,6 +1,6 @@
 #Storm MongoDB
 
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology.
+Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology.
 
 ## Insert into Database
 The bolt and trident state included in this package for inserting data into a database collection.
@@ -11,6 +11,7 @@ The main API for inserting data in a collection using MongoDB is the `org.apache
 ```java
 public interface MongoMapper extends Serializable {
     Document toDocument(ITuple tuple);
+    Document toDocumentByKeys(List<Object> keys);
 }
 ```
 
@@ -30,6 +31,13 @@ public class SimpleMongoMapper implements MongoMapper {
         return document;
     }
 
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+        Document document = new Document();
+        document.append("_id", MongoUtils.getID(keys));
+        return document;
+    }
+
     public SimpleMongoMapper withFields(String... fields) {
         this.fields = fields;
         return this;
@@ -53,38 +61,25 @@ MongoMapper mapper = new SimpleMongoMapper()
 MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
  ```
 
-### MongoTridentState
-We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,  new MongoStateUpdater(), new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
 
 ## Update from Database
 The bolt included in this package for updating data from a database collection.
 
+### MongoUpdateMapper
+The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
+
+```java
+public interface MongoUpdateMapper extends MongoMapper { }
+```
+
 ### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
+`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to.
 `SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit 
 https://docs.mongodb.org/manual/reference/operator/update/
 
 ```java
-public class SimpleMongoUpdateMapper implements MongoMapper {
+public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper {
+
     private String[] fields;
 
     @Override
@@ -93,6 +88,7 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
         for(String field : fields){
             document.append(field, tuple.getValueByField(field));
         }
+        //$set operator: Sets the value of a field in a document.
         return new Document("$set", document);
     }
 
@@ -104,13 +100,13 @@ public class SimpleMongoUpdateMapper implements MongoMapper {
 ```
 
 
- 
 ### QueryFilterCreator
 The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface:
 
  ```java
 public interface QueryFilterCreator extends Serializable {
     Bson createFilter(ITuple tuple);
+    Bson createFilterByKeys(List<Object> keys);
 }
  ```
 
@@ -120,6 +116,7 @@ https://docs.mongodb.org/manual/reference/operator/query/
 
  ```java
 public class SimpleQueryFilterCreator implements QueryFilterCreator {
+
     private String field;
     
     @Override
@@ -127,6 +124,11 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
         return Filters.eq(field, tuple.getValueByField(field));
     }
 
+    @Override
+    public Bson createFilterByKeys(List<Object> keys) {
+        return Filters.eq("_id", MongoUtils.getID(keys));
+    }
+
     public SimpleQueryFilterCreator withField(String field) {
         this.field = field;
         return this;
@@ -136,10 +138,10 @@ public class SimpleQueryFilterCreator implements QueryFilterCreator {
  ```
 
 ### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document.
+To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document.
 
  ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
@@ -149,12 +151,15 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
 
         //if a new document should be inserted if there are no matches to the query filter
         //updateBolt.withUpsert(true);
+
+        //whether find all documents according to the query filter
+        //updateBolt.withMany(true);
  ```
  
  Or use a anonymous inner class implementation for `QueryFilterCreator`:
  
   ```java
-        MongoMapper mapper = new SimpleMongoUpdateMapper()
+        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
                 .withFields("word", "count");
 
         QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@@ -170,6 +175,130 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying Mon
         //updateBolt.withUpsert(true);
  ```
 
+
+## Lookup from Database
+The bolt included in this package for selecting data from a database collection.
+
+### MongoLookupMapper
+The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
+
+```java
+public interface MongoLookupMapper extends Serializable {
+
+    List<Values> toTuple(ITuple input, Document doc);
+
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+### SimpleMongoLookupMapper
+`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values.
+
+```java
+public class SimpleMongoLookupMapper implements MongoLookupMapper {
+
+    private String[] fields;
+
+    @Override
+    public List<Values> toTuple(ITuple input, Document doc) {
+        Values values = new Values();
+
+        for(String field : fields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                values.add(doc.get(field));
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
+
+    public SimpleMongoLookupMapper withFields(String... fields) {
+        this.fields = fields;
+        return this;
+    }
+
+}
+```
+
+### MongoLookupBolt
+To use the `MongoLookupBolt`,  you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values.
+
+ ```java
+        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
+ ```
+
+## Mongo Trident State&MapState
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        MongoState.Options options = new MongoState.Options()
+                .withUrl(url)
+                .withCollectionName(collectionName)
+                .withMapper(mapper);
+
+        StateFactory factory = new MongoStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new MongoStateUpdater(), new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                new MongoStateQuery(), new Fields("columnName", "columnValue"));
+        stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields());
+ ```
+ **NOTE**:
+ >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents.
+
+### Trident MapState
+We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below:
+
+ ```java
+        MongoMapper mapper = new SimpleMongoMapper()
+                .withFields("word", "count");
+
+        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
+                .withField("word");
+
+        MongoMapState.Options options = new MongoMapState.Options();
+        options.url = url;
+        options.collectionName = collectionName;
+        options.mapper = mapper;
+        options.queryCreator = filterCreator;
+
+        StateFactory factory = MongoMapState.transactional(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ ```
+ 
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one


[3/3] storm git commit: Add STORM-1607 to Changelog

Posted by xi...@apache.org.
Add STORM-1607 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f63078c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f63078c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f63078c9

Branch: refs/heads/master
Commit: f63078c90d0671205ecef873d00efd35dd90d98d
Parents: 2dfd440
Author: vesense <be...@163.com>
Authored: Fri Dec 16 11:45:58 2016 +0800
Committer: vesense <be...@163.com>
Committed: Fri Dec 16 13:24:21 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f63078c9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index db1a01d..5aa28a9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1607: Add MongoMapState for supporting trident's exactly once semantics
  * STORM-2204: Adding caching capabilities in HBaseLookupBolt
  * STORM-2104: More graceful handling of acked/failed tuples after partition reassignment
  * STORM-1281: LocalCluster, testing4j and testing.clj to java