You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/31 07:47:34 UTC

[1/3] storm git commit: STORM-1573: Add batch support for MongoInsertBolt

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 652d2f6eb -> 0e4af514d


STORM-1573: Add batch support for MongoInsertBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd6f2b06
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd6f2b06
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd6f2b06

Branch: refs/heads/1.x-branch
Commit: bd6f2b06e81b7aa296615b4541cffba9655bcd86
Parents: 139a8a3
Author: vesense <be...@163.com>
Authored: Thu Mar 17 20:51:51 2016 +0800
Committer: vesense <be...@163.com>
Committed: Thu Mar 31 12:48:50 2016 +0800

----------------------------------------------------------------------
 .../storm/mongodb/bolt/MongoInsertBolt.java     | 74 ++++++++++++++++++--
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  3 +-
 .../storm/mongodb/common/MongoDBClient.java     | 20 +++---
 .../storm/mongodb/trident/state/MongoState.java |  2 +-
 4 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index 26cd150..a030a6c 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -17,11 +17,18 @@
  */
 package org.apache.storm.mongodb.bolt;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.lang.Validate;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Basic bolt for writing to MongoDB.
@@ -30,30 +37,87 @@ import org.bson.Document;
  *
  */
 public class MongoInsertBolt extends AbstractMongoBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(MongoInsertBolt.class);
+
+    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
 
     private MongoMapper mapper;
 
+    private boolean ordered = true;  //default is ordered.
+
+    private int batchSize = 15000;
+
+    private List<Tuple> tupleBatch;
+
+    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
     public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
         super(url, collectionName);
 
         Validate.notNull(mapper, "MongoMapper can not be null");
 
         this.mapper = mapper;
+
+        this.tupleBatch = new LinkedList<>();
     }
 
     @Override
     public void execute(Tuple tuple) {
+        boolean forceFlush = false;
         try{
-            //get document
-            Document doc = mapper.toDocument(tuple);
-            mongoClient.insert(doc);
-            this.collector.ack(tuple);
+            if (TupleUtils.isTick(tuple)) {
+                LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize);
+                collector.ack(tuple);
+                forceFlush = true;
+            } else {
+                tupleBatch.add(tuple);
+                if (tupleBatch.size() >= batchSize) {
+                    forceFlush = true;
+                }
+            }
+
+            if(forceFlush && !tupleBatch.isEmpty()) {
+                List<Document> docs = new LinkedList<>();
+                for (Tuple t : tupleBatch) {
+                    Document doc = mapper.toDocument(t);
+                    docs.add(doc);
+                }
+                mongoClient.insert(docs, ordered);
+
+                for(Tuple t : tupleBatch) {
+                    collector.ack(t);
+                }
+                tupleBatch.clear();
+            }
         } catch (Exception e) {
             this.collector.reportError(e);
-            this.collector.fail(tuple);
+            for (Tuple t : tupleBatch) {
+                collector.fail(t);
+            }
+            tupleBatch.clear();
         }
     }
 
+    public MongoInsertBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public MongoInsertBolt withOrdered(boolean ordered) {
+        this.ordered = ordered;
+        return this;
+    }
+
+    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+        this.flushIntervalSecs = flushIntervalSecs;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+    }
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         

http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 1994993..510a3d0 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -63,8 +63,9 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
         }
     }
 
-    public void withUpsert(boolean upsert) {
+    public MongoUpdateBolt withUpsert(boolean upsert) {
         this.upsert = upsert;
+        return this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
index be2e376..cb4c454 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -26,6 +26,7 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
 import com.mongodb.client.model.UpdateOptions;
 
 public class MongoDBClient {
@@ -45,15 +46,6 @@ public class MongoDBClient {
     }
 
     /**
-     * Inserts the provided document.
-     * 
-     * @param document
-     */
-    public void insert(Document document) {
-        collection.insertOne(document);
-    }
-
-    /**
      * Inserts one or more documents.
      * This method is equivalent to a call to the bulkWrite method.
      * The documents will be inserted in the order provided, 
@@ -61,8 +53,12 @@ public class MongoDBClient {
      * 
      * @param documents
      */
-    public void insert(List<Document> documents) {
-        collection.insertMany(documents);
+    public void insert(List<Document> documents, boolean ordered) {
+        InsertManyOptions options = new InsertManyOptions();
+        if (!ordered) {
+            options.ordered(false);
+        }
+        collection.insertMany(documents, options);
     }
 
     /**
@@ -75,7 +71,7 @@ public class MongoDBClient {
      */
     public void update(Bson filter, Bson update, boolean upsert) {
         UpdateOptions options = new UpdateOptions();
-        if(upsert) {
+        if (upsert) {
             options.upsert(true);
         }
         collection.updateMany(filter, update, options);

http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 843fcee..112b170 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -91,7 +91,7 @@ public class MongoState implements State {
             Document document = options.mapper.toDocument(tuple);
             documents.add(document);
         }
-        this.mongoClient.insert(documents);
+        this.mongoClient.insert(documents, true);
     }
 
 }


[3/3] storm git commit: add STORM-1573 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1573 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e4af514
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e4af514
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e4af514

Branch: refs/heads/1.x-branch
Commit: 0e4af514d786cd8d8ad5411405de34ed07ea436a
Parents: fac10d7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Mar 31 14:47:16 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Mar 31 14:47:16 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0e4af514/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c37ef4a..89391a0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1573: Add batch support for MongoInsertBolt
  * STORM-1660: remove flux gitignore file and move rules to top level gitignore
  * STORM-1622: Rename classes with older third party shaded packages
  * STORM-1537: Upgrade to kryo 3


[2/3] storm git commit: Merge branch 'STORM-1573-1.x' of https://github.com/vesense/storm into STORM-1573-1.x

Posted by ka...@apache.org.
Merge branch 'STORM-1573-1.x' of https://github.com/vesense/storm into STORM-1573-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fac10d78
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fac10d78
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fac10d78

Branch: refs/heads/1.x-branch
Commit: fac10d78e65053e416f5d85378971a3a4f2dcafe
Parents: 652d2f6 bd6f2b0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Mar 31 14:46:41 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Mar 31 14:46:41 2016 +0900

----------------------------------------------------------------------
 .../storm/mongodb/bolt/MongoInsertBolt.java     | 74 ++++++++++++++++++--
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  3 +-
 .../storm/mongodb/common/MongoDBClient.java     | 20 +++---
 .../storm/mongodb/trident/state/MongoState.java |  2 +-
 4 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------