You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ch...@apache.org on 2022/11/23 19:27:17 UTC

[nifi] branch main updated: NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty flow file for a combined response with output_no_hits set to true

This is an automated email from the ASF dual-hosted git repository.

chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ad33eea80 NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty flow file for a combined response with output_no_hits set to true
2ad33eea80 is described below

commit 2ad33eea8002db00b5bf80edc7b2dc30cb3d1557
Author: Ryan Van Den Bos <ry...@naimuri.com>
AuthorDate: Tue Nov 22 09:41:44 2022 +0000

    NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty flow file for a combined response with output_no_hits set to true
    
    Signed-off-by: Chris Sampson <ch...@gmail.com>
    
    This closes #6701
---
 .../AbstractJsonQueryElasticsearch.java            |  8 +++--
 .../AbstractPaginatedJsonQueryElasticsearch.java   |  3 ++
 ...tractPaginatedJsonQueryElasticsearchTest.groovy | 39 ++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 1ce0f7f414..f154cf2790 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -105,6 +105,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
     private String splitUpAggregations;
     private boolean outputNoHits;
 
+    boolean getOutputNoHits() {
+        return outputNoHits;
+    }
+
     final ObjectMapper mapper = new ObjectMapper();
 
     final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
@@ -277,8 +281,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         }
     }
 
-    private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
-                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
+    FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
+                              final FlowFile hitFlowFile, final Map<String, String> attributes) {
         final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
         attributes.put("hit.count", Integer.toString(count));
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index 2e8eab6eeb..ec1a020ad7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -273,6 +273,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
             hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
                     hits, session, hitFlowFile, attributes, append));
+        } else if (getOutputNoHits()) {
+            final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+            hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 274e6a2fd5..0ec4470f76 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -268,4 +268,43 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
             assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
         }
     }
+
+    @Test
+    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
+        service.setMaxPages(0)
+
+        // test that an empty flow file is produced for a per query setup
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
+        reset(runner)
+
+        // test that an empty flow file is produced for a per hit setup
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
+        reset(runner)
+
+        // test that an empty flow file is produced for a per response setup
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
+        reset(runner)
+    }
 }