You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/11/13 16:45:11 UTC
nifi git commit: NIFI-4588 Added the ability to use update operators
like $push and $set to PutMongo.
Repository: nifi
Updated Branches:
refs/heads/master 387dce5ad -> fe3f28894
NIFI-4588 Added the ability to use update operators like $push and $set to PutMongo.
Removed commented out code
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2259
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe3f2889
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe3f2889
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe3f2889
Branch: refs/heads/master
Commit: fe3f288944eb02243ec446af17d23527f05a080d
Parents: 387dce5
Author: Mike Thomsen <mi...@gmail.com>
Authored: Thu Nov 9 08:54:41 2017 -0500
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Nov 13 11:44:22 2017 -0500
----------------------------------------------------------------------
.../nifi/processors/mongodb/PutMongo.java | 58 ++++++++++++++------
.../nifi/processors/mongodb/PutMongoTest.java | 31 +++++++++++
2 files changed, 72 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe3f2889/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index cd42635..03d3e0c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -16,20 +16,17 @@
*/
package org.apache.nifi.processors.mongodb;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.mongodb.BasicDBObject;
+import com.mongodb.WriteConcern;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.util.JSON;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -42,9 +39,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
-import com.mongodb.WriteConcern;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.UpdateOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
@EventDriven
@Tags({ "mongodb", "insert", "update", "write", "put" })
@@ -59,6 +62,9 @@ public class PutMongo extends AbstractMongoProcessor {
static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";
+ static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document");
+ static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled");
+
static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should insert or update content")
@@ -83,6 +89,15 @@ public class PutMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("_id")
.build();
+ static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
+ .displayName("Update Mode")
+ .name("put-mongo-update-mode")
+ .required(true)
+ .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
+ .defaultValue(UPDATE_WITH_DOC.getValue())
+ .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
+ "or specify a document that contains update operators like $set and $unset")
+ .build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
@@ -100,6 +115,7 @@ public class PutMongo extends AbstractMongoProcessor {
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
+ _propertyDescriptors.add(UPDATE_MODE);
_propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@@ -131,6 +147,7 @@ public class PutMongo extends AbstractMongoProcessor {
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
+ final String updateMode = context.getProperty(UPDATE_MODE).getValue();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
@@ -146,18 +163,25 @@ public class PutMongo extends AbstractMongoProcessor {
});
// parse
- final Document doc = Document.parse(new String(content, charset));
+ final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ ? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset));
if (MODE_INSERT.equalsIgnoreCase(mode)) {
- collection.insertOne(doc);
+ collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] { flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
- final Document query = new Document(updateKey, doc.get(updateKey));
-
- collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert));
+ final Document query = new Document(updateKey, ((Map)doc).get(updateKey));
+
+ if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
+ collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
+ } else {
+ BasicDBObject update = (BasicDBObject)doc;
+ update.remove("_id");
+ collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
+ }
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe3f2889/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index 10f81d1..8828333 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -256,4 +256,35 @@ public class PutMongoTest {
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
+
+ @Test
+ public void testUpsertWithOperators() throws Exception {
+ String upsert = "{\n" +
+ " \"_id\": \"Test\",\n" +
+ " \"$push\": {\n" +
+ " \"testArr\": { \"msg\": \"Hi\" }\n" +
+ " }\n" +
+ "}";
+ runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.MODE, "update");
+ runner.setProperty(PutMongo.UPSERT, "true");
+ for (int x = 0; x < 3; x++) {
+ runner.enqueue(upsert.getBytes());
+ }
+ runner.run(3, true, true);
+ runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+ runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
+
+ Document query = new Document("_id", "Test");
+ Document result = collection.find(query).first();
+ List array = (List)result.get("testArr");
+ Assert.assertNotNull("Array was empty", array);
+ Assert.assertEquals("Wrong size", array.size(), 3);
+ for (int index = 0; index < array.size(); index++) {
+ Document doc = (Document)array.get(index);
+ String msg = doc.getString("msg");
+ Assert.assertNotNull("Msg was null", msg);
+ Assert.assertEquals("Msg had wrong value", msg, "Hi");
+ }
+ }
}