You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/05/20 23:55:50 UTC

[nifi] branch master updated: NIFI-7463 Create empty relationship for RunMongoAggregation

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7edcd6  NIFI-7463 Create empty relationship for RunMongoAggregation
c7edcd6 is described below

commit c7edcd68e1d6d98472953897ec8c9a18d2f7d290
Author: eduardofontes <ed...@gmail.com>
AuthorDate: Sun May 17 02:27:15 2020 -0300

    NIFI-7463
    Create empty relationship for RunMongoAggregation
    
    Fix default autoterminate and condition to redirect to REL_EMPTY
    
    Change from new relationship to write an empty FlowFile to RESULT
    
    Fix MONGO_URI
    
    This closes #4281
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../processors/mongodb/RunMongoAggregation.java    |  8 ++++++-
 .../processors/mongodb/RunMongoAggregationIT.java  | 27 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
index 0c3bb762..3c4ddef 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
@@ -183,17 +183,23 @@ public class RunMongoAggregation extends AbstractMongoProcessor {
 
             iter = it.iterator();
             List<Document> batch = new ArrayList<>();
+            Boolean doneSomething = false;
 
             while (iter.hasNext()) {
                 batch.add(iter.next());
                 if (batch.size() == resultsPerFlowfile) {
                     writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
                     batch = new ArrayList<>();
+                    doneSomething |= true;
                 }
             }
 
-            if (batch.size() > 0) {
+            if (! batch.isEmpty()) {
+                // Something remains in batch list, write it to RESULT
                 writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
+            } else if (! doneSomething) {
+                // The batch list is empty and no batch was written (empty result!), so write empty string to RESULT
+                writeBatch("", flowFile, context, session, attrs, REL_RESULTS);
             }
 
             if (flowFile != null) {
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
index 23ac4f0..2aadc5f 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
@@ -263,4 +263,31 @@ public class RunMongoAggregationIT {
 
         runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
     }
+
+    @Test
+    public void testEmptyResponse() throws Exception {
+        final String queryInput = "[\n" +
+                "  {\n" +
+                "    \"$match\": {\n" +
+                "      \"val\": \"no_exists\"\n" +
+                "    }\n" +
+                "  },\n" +
+                "  {\n" +
+                "    \"$group\": {\n" +
+                "      \"_id\": \"null\",\n" +
+                "      \"doc_count\": {\n" +
+                "        \"$sum\": 1\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "]";
+
+        runner.setProperty(RunMongoAggregation.QUERY, queryInput);
+        runner.enqueue("test");
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 1);
+        runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 0);
+        runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 1);
+    }
 }