You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/06/01 15:28:15 UTC

[nifi] branch master updated: NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 407add7  NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.
407add7 is described below

commit 407add7847f9d79ea9c7b604fd57c3160ff30c06
Author: Mike Thomsen <mi...@gmail.com>
AuthorDate: Sat Feb 16 07:27:25 2019 -0500

    NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.
    
    NIFI-5916 Fixed potential NPE.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3315
---
 .../apache/nifi/processors/mongodb/GetMongo.java   | 30 +++++++++++++++++++++-
 .../apache/nifi/processors/mongodb/GetMongoIT.java | 21 +++++++++++++--
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 286c47e..b3b82b0 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -29,15 +29,18 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.bson.Document;
 import org.bson.json.JsonWriterSettings;
 
@@ -58,7 +61,16 @@ import java.util.Set;
     @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
 })
 public class GetMongo extends AbstractMongoQueryProcessor {
-
+    public static final PropertyDescriptor SEND_EMPTY_RESULTS = new PropertyDescriptor.Builder()
+        .name("get-mongo-send-empty")
+        .displayName("Send Empty Result")
+        .description("If a query executes successfully, but returns no results, send an empty JSON document " +
+                "signifying no result.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .required(false)
+        .build();
 
     static final AllowableValue YES_PP = new AllowableValue("true", "True");
     static final AllowableValue NO_PP  = new AllowableValue("false", "False");
@@ -94,6 +106,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
         _propertyDescriptors.add(DATE_FORMAT);
         _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
         _propertyDescriptors.add(CLIENT_AUTH);
+        _propertyDescriptors.add(SEND_EMPTY_RESULTS);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         final Set<Relationship> _relationships = new HashSet<>();
@@ -103,6 +116,12 @@ public class GetMongo extends AbstractMongoQueryProcessor {
         relationships = Collections.unmodifiableSet(_relationships);
     }
 
+    private boolean sendEmpty;
+    @OnScheduled
+    public void onScheduled(PropertyContext context) {
+        sendEmpty = context.getProperty(SEND_EMPTY_RESULTS).asBoolean();
+    }
+
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -191,6 +210,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
             it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
         }
 
+        long sent = 0;
         try (MongoCursor<Document> cursor = it.iterator()) {
             configureMapper(jsonTypeSetting, dateFormat);
 
@@ -209,6 +229,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
                             logger.error("Error building batch due to {}", new Object[] {e});
                         }
                     }
+                    sent++;
                 }
 
                 if (batch.size() > 0) {
@@ -234,12 +255,19 @@ public class GetMongo extends AbstractMongoQueryProcessor {
                     outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
                     session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
                     session.transfer(outgoingFlowFile, REL_SUCCESS);
+                    sent++;
                 }
             }
 
             if (input != null) {
                 session.transfer(input, REL_ORIGINAL);
             }
+
+            if (sent == 0 && sendEmpty) {
+                FlowFile empty = input != null ? session.create(input) : session.create();
+                empty = session.putAllAttributes(empty, attributes);
+                session.transfer(empty, REL_SUCCESS);
+            }
         }
 
     }
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 286a70d..ddea9a8 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -620,7 +620,7 @@ public class GetMongoIT {
         //Test a bad flowfile attribute
         runner.setIncomingConnection(true);
         runner.setProperty(GetMongo.QUERY, "${badfromff}");
-        runner.enqueue("<<?>>", new HashMap<String, String>(){{
+        runner.enqueue("<<?>>", new HashMap<String, String>() {{
             put("badfromff", "{\"prop\":}");
         }});
         runner.run();
@@ -633,7 +633,7 @@ public class GetMongoIT {
         //Test for regression on a good query from a flowfile attribute
         runner.setIncomingConnection(true);
         runner.setProperty(GetMongo.QUERY, "${badfromff}");
-        runner.enqueue("<<?>>", new HashMap<String, String>(){{
+        runner.enqueue("<<?>>", new HashMap<String, String>() {{
             put("badfromff", "{}");
         }});
         runner.run();
@@ -651,4 +651,21 @@ public class GetMongoIT {
         runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
         runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
     }
+
+    public void testSendEmpty() throws Exception {
+        runner.setIncomingConnection(true);
+        runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true");
+        runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }");
+        runner.assertValid();
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
+        runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+        runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        MockFlowFile flowFile = flowFiles.get(0);
+        Assert.assertEquals(0, flowFile.getSize());
+    }
 }