You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/07/03 11:52:23 UTC

nifi git commit: NIFI-5334: GetMongo keeps original attributes when it has incoming connections

Repository: nifi
Updated Branches:
  refs/heads/master df4931280 -> 3164759d7


NIFI-5334: GetMongo keeps original attributes when it has incoming connections

This closes #2815

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3164759d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3164759d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3164759d

Branch: refs/heads/master
Commit: 3164759d7ffb8ba5be360796fd4bfc004f2c79c0
Parents: df49312
Author: zenfenan <si...@gmail.com>
Authored: Tue Jun 26 20:01:59 2018 +0530
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Jul 3 07:51:20 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/mongodb/GetMongo.java       | 18 +++++++++-------
 .../nifi/processors/mongodb/GetMongoIT.java     | 22 +++++++++++++++++++-
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3164759d/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index e744577..c5201fb 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -45,6 +45,7 @@ import org.bson.json.JsonWriterSettings;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -209,6 +210,8 @@ public class GetMongo extends AbstractMongoProcessor {
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
 
         final Document query;
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+
         String queryStr;
         if (context.getProperty(QUERY).isSet()) {
             queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
@@ -268,7 +271,7 @@ public class GetMongo extends AbstractMongoProcessor {
             final MongoCursor<Document> cursor = it.iterator();
             ComponentLog log = getLogger();
             try {
-                FlowFile flowFile = null;
+                FlowFile outgoingFlowFile;
                 if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
                     int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
                     List<Document> batch = new ArrayList<>();
@@ -297,21 +300,20 @@ public class GetMongo extends AbstractMongoProcessor {
                     }
                 } else {
                     while (cursor.hasNext()) {
-                        final FlowFile ffPtr = input;
-                        flowFile = session.create();
-                        flowFile = session.write(flowFile, out -> {
+                        outgoingFlowFile = (input == null) ? session.create() : session.create(input);
+                        outgoingFlowFile = session.write(outgoingFlowFile, out -> {
                             String json;
                             if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
                                 json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
                             } else {
                                 json = cursor.next().toJson();
                             }
-                            out.write(json.getBytes(context.getProperty(CHARSET).evaluateAttributeExpressions(ffPtr).getValue()));
+                            out.write(json.getBytes(charset));
                         });
-                        flowFile = session.putAllAttributes(flowFile, attributes);
+                        outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
 
-                        session.getProvenanceReporter().receive(flowFile, getURI(context));
-                        session.transfer(flowFile, REL_SUCCESS);
+                        session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
+                        session.transfer(outgoingFlowFile, REL_SUCCESS);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3164759d/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 2c9b6d2..1823edf 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -406,7 +406,6 @@ public class GetMongoIT {
         runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
         runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
         runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
-
     }
 
     @Test
@@ -445,6 +444,27 @@ public class GetMongoIT {
         runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
         runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
     }
+
+    @Test
+    public void testKeepOriginalAttributes() {
+        final String query = "{ \"c\": { \"$gte\": 4 }}";
+        final Map<String, String> attributesMap = new HashMap<>(1);
+        attributesMap.put("property.1", "value-1");
+
+        runner.setIncomingConnection(true);
+        runner.removeProperty(GetMongo.QUERY);
+        runner.enqueue(query, attributesMap);
+
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
+        runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
+        runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0);
+        Assert.assertTrue(flowFile.getAttributes().containsKey("property.1"));
+        flowFile.assertAttributeEquals("property.1", "value-1");
+    }
     /*
      * End query read behavior tests
      */