You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/01/17 20:48:45 UTC

nifi git commit: NIFI-4759 - Fixed a bug that left a hard-coded reference to _id in as the update key for MongoDB upserts.

Repository: nifi
Updated Branches:
  refs/heads/master ea2519e3e -> ca54186b6


NIFI-4759 - Fixed a bug that left a hard-coded reference to _id in as the update key for MongoDB upserts.

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2401.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ca54186b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ca54186b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ca54186b

Branch: refs/heads/master
Commit: ca54186b608682d028719ae836bd8d8f83fd24d7
Parents: ea2519e
Author: Mike Thomsen <mi...@gmail.com>
Authored: Wed Jan 10 08:35:09 2018 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Jan 17 21:48:31 2018 +0100

----------------------------------------------------------------------
 .../nifi/processors/mongodb/PutMongo.java       | 21 ++++----
 .../nifi/processors/mongodb/PutMongoTest.java   | 52 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ca54186b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 03d3e0c..2df59b6 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -34,13 +34,11 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.bson.Document;
+import org.bson.types.ObjectId;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -155,12 +153,7 @@ public class PutMongo extends AbstractMongoProcessor {
         try {
             // Read the contents of the FlowFile into a byte array
             final byte[] content = new byte[(int) flowFile.getSize()];
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    StreamUtils.fillBuffer(in, content, true);
-                }
-            });
+            session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));
 
             // parse
             final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
@@ -173,13 +166,19 @@ public class PutMongo extends AbstractMongoProcessor {
                 // update
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
-                final Document query = new Document(updateKey, ((Map)doc).get(updateKey));
+
+                Object keyVal = ((Map)doc).get(updateKey);
+                if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) {
+                    keyVal = new ObjectId((String) keyVal);
+                }
+
+                final Document query = new Document(updateKey, keyVal);
 
                 if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
                     collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
                 } else {
                     BasicDBObject update = (BasicDBObject)doc;
-                    update.remove("_id");
+                    update.remove(updateKey);
                     collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
                 }
                 logger.info("updated {} into MongoDB", new Object[] { flowFile });

http://git-wip-us.apache.org/repos/asf/nifi/blob/ca54186b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index f019704..d0b1a9d 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -23,6 +23,7 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.bson.Document;
+import org.bson.types.ObjectId;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -256,4 +257,55 @@ public class PutMongoTest extends MongoWriteTestBase {
             Assert.assertEquals("Msg had wrong value", msg, "Hi");
         }
     }
+
+    /*
+     * Start NIFI-4759 Regression Tests
+     *
+     * 2 issues with ID field:
+     *
+     * * Assumed _id is the update key, causing failures when the user configured a different one in the UI.
+     * * Treated _id as a string even when it is an ObjectID sent from another processor as a string value.
+     *
+     * Expected behavior:
+     *
+     * * update key field should work no matter what (legal) value it is set to be.
+     * * _ids that are ObjectID should become real ObjectIDs when added to Mongo.
+     * * _ids that are arbitrary strings should be still go in as strings.
+     *
+     */
+    @Test
+    public void testNiFi_4759_Regressions() {
+        String[] upserts = new String[]{
+                "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }",
+                "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }",
+                "{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }"
+        };
+
+        String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" };
+        Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" };
+        int index = 0;
+
+        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.UPSERT, "true");
+
+        final int LIMIT = 2;
+
+        for (String upsert : upserts) {
+            runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]);
+            for (int x = 0; x < LIMIT; x++) {
+                runner.enqueue(upsert);
+            }
+            runner.run(LIMIT, true, true);
+            runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+            runner.assertTransferCount(PutMongo.REL_SUCCESS, LIMIT);
+
+            Document query = new Document(updateKeyProps[index], updateKeys[index]);
+            Document result = collection.find(query).first();
+            Assert.assertNotNull("Result was null", result);
+            Assert.assertEquals("Count was wrong", 1, collection.count(query));
+            runner.clearTransferState();
+            index++;
+        }
+    }
 }