You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/11/13 16:45:11 UTC

nifi git commit: NIFI-4588 Added the ability to use update operators like $push and $set to PutMongo.

Repository: nifi
Updated Branches:
  refs/heads/master 387dce5ad -> fe3f28894


NIFI-4588 Added the ability to use update operators like $push and $set to PutMongo.

Removed commented out code

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2259


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe3f2889
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe3f2889
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe3f2889

Branch: refs/heads/master
Commit: fe3f288944eb02243ec446af17d23527f05a080d
Parents: 387dce5
Author: Mike Thomsen <mi...@gmail.com>
Authored: Thu Nov 9 08:54:41 2017 -0500
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Nov 13 11:44:22 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/mongodb/PutMongo.java       | 58 ++++++++++++++------
 .../nifi/processors/mongodb/PutMongoTest.java   | 31 +++++++++++
 2 files changed, 72 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fe3f2889/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index cd42635..03d3e0c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -16,20 +16,17 @@
  */
 package org.apache.nifi.processors.mongodb;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.mongodb.BasicDBObject;
+import com.mongodb.WriteConcern;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.util.JSON;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -42,9 +39,15 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.bson.Document;
 
-import com.mongodb.WriteConcern;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.UpdateOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @EventDriven
 @Tags({ "mongodb", "insert", "update", "write", "put" })
@@ -59,6 +62,9 @@ public class PutMongo extends AbstractMongoProcessor {
     static final String MODE_INSERT = "insert";
     static final String MODE_UPDATE = "update";
 
+    static final AllowableValue UPDATE_WITH_DOC = new AllowableValue("doc", "With whole document");
+    static final AllowableValue UPDATE_WITH_OPERATORS = new AllowableValue("operators", "With operators enabled");
+
     static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
         .name("Mode")
         .description("Indicates whether the processor should insert or update content")
@@ -83,6 +89,15 @@ public class PutMongo extends AbstractMongoProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .defaultValue("_id")
         .build();
+    static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
+            .displayName("Update Mode")
+            .name("put-mongo-update-mode")
+            .required(true)
+            .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
+            .defaultValue(UPDATE_WITH_DOC.getValue())
+            .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
+                    "or specify a document that contains update operators like $set and $unset")
+            .build();
     static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
         .name("Character Set")
         .description("The Character Set in which the data is encoded")
@@ -100,6 +115,7 @@ public class PutMongo extends AbstractMongoProcessor {
         _propertyDescriptors.add(MODE);
         _propertyDescriptors.add(UPSERT);
         _propertyDescriptors.add(UPDATE_QUERY_KEY);
+        _propertyDescriptors.add(UPDATE_MODE);
         _propertyDescriptors.add(WRITE_CONCERN);
         _propertyDescriptors.add(CHARACTER_SET);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@@ -131,6 +147,7 @@ public class PutMongo extends AbstractMongoProcessor {
 
         final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
         final String mode = context.getProperty(MODE).getValue();
+        final String updateMode = context.getProperty(UPDATE_MODE).getValue();
         final WriteConcern writeConcern = getWriteConcern(context);
 
         final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
@@ -146,18 +163,25 @@ public class PutMongo extends AbstractMongoProcessor {
             });
 
             // parse
-            final Document doc = Document.parse(new String(content, charset));
+            final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+                    ? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset));
 
             if (MODE_INSERT.equalsIgnoreCase(mode)) {
-                collection.insertOne(doc);
+                collection.insertOne((Document)doc);
                 logger.info("inserted {} into MongoDB", new Object[] { flowFile });
             } else {
                 // update
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
-                final Document query = new Document(updateKey, doc.get(updateKey));
-
-                collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert));
+                final Document query = new Document(updateKey, ((Map)doc).get(updateKey));
+
+                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
+                    collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
+                } else {
+                    BasicDBObject update = (BasicDBObject)doc;
+                    update.remove("_id");
+                    collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
+                }
                 logger.info("updated {} into MongoDB", new Object[] { flowFile });
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fe3f2889/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index 10f81d1..8828333 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -256,4 +256,35 @@ public class PutMongoTest {
         assertEquals(1, collection.count());
         assertEquals(doc, collection.find().first());
     }
+
+    @Test
+    public void testUpsertWithOperators() throws Exception {
+        String upsert = "{\n" +
+                "  \"_id\": \"Test\",\n" +
+                "  \"$push\": {\n" +
+                "     \"testArr\": { \"msg\": \"Hi\" }\n" +
+                "  }\n" +
+                "}";
+        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.UPSERT, "true");
+        for (int x = 0; x < 3; x++) {
+            runner.enqueue(upsert.getBytes());
+        }
+        runner.run(3, true, true);
+        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+        runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
+
+        Document query = new Document("_id", "Test");
+        Document result = collection.find(query).first();
+        List array = (List)result.get("testArr");
+        Assert.assertNotNull("Array was empty", array);
+        Assert.assertEquals("Wrong size", array.size(), 3);
+        for (int index = 0; index < array.size(); index++) {
+            Document doc = (Document)array.get(index);
+            String msg = doc.getString("msg");
+            Assert.assertNotNull("Msg was null", msg);
+            Assert.assertEquals("Msg had wrong value", msg, "Hi");
+        }
+    }
 }