You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/03/31 07:47:34 UTC
[1/3] storm git commit: STORM-1573: Add batch support for
MongoInsertBolt
Repository: storm
Updated Branches:
refs/heads/1.x-branch 652d2f6eb -> 0e4af514d
STORM-1573: Add batch support for MongoInsertBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd6f2b06
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd6f2b06
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd6f2b06
Branch: refs/heads/1.x-branch
Commit: bd6f2b06e81b7aa296615b4541cffba9655bcd86
Parents: 139a8a3
Author: vesense <be...@163.com>
Authored: Thu Mar 17 20:51:51 2016 +0800
Committer: vesense <be...@163.com>
Committed: Thu Mar 31 12:48:50 2016 +0800
----------------------------------------------------------------------
.../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++++++++++++++++++--
.../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +-
.../storm/mongodb/common/MongoDBClient.java | 20 +++---
.../storm/mongodb/trident/state/MongoState.java | 2 +-
4 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index 26cd150..a030a6c 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -17,11 +17,18 @@
*/
package org.apache.storm.mongodb.bolt;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Basic bolt for writing to MongoDB.
@@ -30,30 +37,87 @@ import org.bson.Document;
*
*/
public class MongoInsertBolt extends AbstractMongoBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoInsertBolt.class);
+
+ private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
private MongoMapper mapper;
+ private boolean ordered = true; //default is ordered.
+
+ private int batchSize = 15000;
+
+ private List<Tuple> tupleBatch;
+
+ private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) {
super(url, collectionName);
Validate.notNull(mapper, "MongoMapper can not be null");
this.mapper = mapper;
+
+ this.tupleBatch = new LinkedList<>();
}
@Override
public void execute(Tuple tuple) {
+ boolean forceFlush = false;
try{
- //get document
- Document doc = mapper.toDocument(tuple);
- mongoClient.insert(doc);
- this.collector.ack(tuple);
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize);
+ collector.ack(tuple);
+ forceFlush = true;
+ } else {
+ tupleBatch.add(tuple);
+ if (tupleBatch.size() >= batchSize) {
+ forceFlush = true;
+ }
+ }
+
+ if(forceFlush && !tupleBatch.isEmpty()) {
+ List<Document> docs = new LinkedList<>();
+ for (Tuple t : tupleBatch) {
+ Document doc = mapper.toDocument(t);
+ docs.add(doc);
+ }
+ mongoClient.insert(docs, ordered);
+
+ for(Tuple t : tupleBatch) {
+ collector.ack(t);
+ }
+ tupleBatch.clear();
+ }
} catch (Exception e) {
this.collector.reportError(e);
- this.collector.fail(tuple);
+ for (Tuple t : tupleBatch) {
+ collector.fail(t);
+ }
+ tupleBatch.clear();
}
}
+ public MongoInsertBolt withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public MongoInsertBolt withOrdered(boolean ordered) {
+ this.ordered = ordered;
+ return this;
+ }
+
+ public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+ this.flushIntervalSecs = flushIntervalSecs;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
+ }
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 1994993..510a3d0 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -63,8 +63,9 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
}
}
- public void withUpsert(boolean upsert) {
+ public MongoUpdateBolt withUpsert(boolean upsert) {
this.upsert = upsert;
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
index be2e376..cb4c454 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -26,6 +26,7 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.UpdateOptions;
public class MongoDBClient {
@@ -45,15 +46,6 @@ public class MongoDBClient {
}
/**
- * Inserts the provided document.
- *
- * @param document
- */
- public void insert(Document document) {
- collection.insertOne(document);
- }
-
- /**
* Inserts one or more documents.
* This method is equivalent to a call to the bulkWrite method.
* The documents will be inserted in the order provided,
@@ -61,8 +53,12 @@ public class MongoDBClient {
*
* @param documents
*/
- public void insert(List<Document> documents) {
- collection.insertMany(documents);
+ public void insert(List<Document> documents, boolean ordered) {
+ InsertManyOptions options = new InsertManyOptions();
+ if (!ordered) {
+ options.ordered(false);
+ }
+ collection.insertMany(documents, options);
}
/**
@@ -75,7 +71,7 @@ public class MongoDBClient {
*/
public void update(Bson filter, Bson update, boolean upsert) {
UpdateOptions options = new UpdateOptions();
- if(upsert) {
+ if (upsert) {
options.upsert(true);
}
collection.updateMany(filter, update, options);
http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 843fcee..112b170 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -91,7 +91,7 @@ public class MongoState implements State {
Document document = options.mapper.toDocument(tuple);
documents.add(document);
}
- this.mongoClient.insert(documents);
+ this.mongoClient.insert(documents, true);
}
}
[3/3] storm git commit: add STORM-1573 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1573 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e4af514
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e4af514
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e4af514
Branch: refs/heads/1.x-branch
Commit: 0e4af514d786cd8d8ad5411405de34ed07ea436a
Parents: fac10d7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Mar 31 14:47:16 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Mar 31 14:47:16 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0e4af514/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c37ef4a..89391a0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1573: Add batch support for MongoInsertBolt
* STORM-1660: remove flux gitignore file and move rules to top level gitignore
* STORM-1622: Rename classes with older third party shaded packages
* STORM-1537: Upgrade to kryo 3
[2/3] storm git commit: Merge branch 'STORM-1573-1.x' of
https://github.com/vesense/storm into STORM-1573-1.x
Posted by ka...@apache.org.
Merge branch 'STORM-1573-1.x' of https://github.com/vesense/storm into STORM-1573-1.x
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fac10d78
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fac10d78
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fac10d78
Branch: refs/heads/1.x-branch
Commit: fac10d78e65053e416f5d85378971a3a4f2dcafe
Parents: 652d2f6 bd6f2b0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Mar 31 14:46:41 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Mar 31 14:46:41 2016 +0900
----------------------------------------------------------------------
.../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++++++++++++++++++--
.../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +-
.../storm/mongodb/common/MongoDBClient.java | 20 +++---
.../storm/mongodb/trident/state/MongoState.java | 2 +-
4 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------