You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/02/22 11:34:01 UTC

[nifi] branch main updated: NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 07131a6  NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query
07131a6 is described below

commit 07131a66ea39155f4523b974912b6820058cad5b
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Mon Feb 21 21:12:24 2022 +0000

    NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query
    
    This closes #5786
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../AbstractJsonQueryElasticsearch.java            | 22 +++++++++-
 .../AbstractPaginatedJsonQueryElasticsearch.java   |  5 ++-
 .../AbstractJsonQueryElasticsearchTest.groovy      | 49 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 8af8770..25c9417 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -86,6 +86,17 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
+    public static final PropertyDescriptor OUTPUT_NO_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-output-no-hits")
+            .displayName("Output No Hits")
+            .description("Output a \"" + REL_HITS.getName() + "\" flowfile even if no hits found for query. " +
+                    "If true, an empty \"" + REL_HITS.getName() + "\" flowfile will be output even if \"" +
+                    REL_AGGREGATIONS.getName() + "\" are output.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
 
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
@@ -93,6 +104,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
     AtomicReference<ElasticSearchClientService> clientService;
     String splitUpHits;
     private String splitUpAggregations;
+    private boolean outputNoHits;
 
     final ObjectMapper mapper = new ObjectMapper();
 
@@ -112,6 +124,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         descriptors.add(CLIENT_SERVICE);
         descriptors.add(SEARCH_RESULTS_SPLIT);
         descriptors.add(AGGREGATION_RESULTS_SPLIT);
+        descriptors.add(OUTPUT_NO_HITS);
 
         propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
@@ -143,6 +156,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
 
         splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
         splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
+
+        outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
     }
 
     @OnStopped
@@ -271,7 +286,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
      * for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final Q queryJsonParameters, final ProcessSession session,
                               final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
                               final String transitUri, final StopWatch stopWatch) throws IOException {
         if (hits != null && !hits.isEmpty()) {
@@ -286,6 +301,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
                 final String json = mapper.writeValueAsString(hits);
                 hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
             }
+        } else if (newQuery && outputNoHits) {
+            final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+            hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
         }
 
         transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
@@ -319,7 +337,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
             handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
         }
 
-        final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input,
+        final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), newQuery, queryJsonParameters, session, input,
                 attributes, hitsFlowFiles, transitUri, stopWatch);
         queryJsonParameters.addHitCount(response.getHits().size());
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index e5b47ee..2e8eab6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -109,6 +109,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
         descriptors.add(AGGREGATION_RESULTS_SPLIT);
         descriptors.add(PAGINATION_TYPE);
         descriptors.add(PAGINATION_KEEP_ALIVE);
+        descriptors.add(OUTPUT_NO_HITS);
 
         paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
     }
@@ -282,7 +283,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
                               final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
                               final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
         paginatedJsonQueryParameters.incrementPageCount();
@@ -298,7 +299,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
                 hitsFlowFiles.clear();
             }
         } else {
-            super.handleHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
+            super.handleHits(hits, newQuery, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
         }
 
         return hitsFlowFiles;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index 3ae81a2..b33f944 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -52,6 +52,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         runner.removeProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE)
         runner.removeProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT)
         runner.removeProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS)
 
         final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
         if (processor instanceof SearchElasticsearch) {
@@ -82,13 +83,14 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, "not-json")
         runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, "not-enum")
         runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, "not-enum2")
+        runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean")
 
         final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
             ? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
             : [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
 
         final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
-        assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 7 validation failures:\n" +
+        assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
                 "'%s' validated against 'not-json' is invalid because %s is not a valid JSON representation due to Unrecognized token 'not': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" +
                 " at [Source: (String)\"not-json\"; line: 1, column: 4]\n" +
                 "'%s' validated against '' is invalid because %s cannot be empty\n" +
@@ -96,6 +98,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
                 "'%s' validated against 'not-a-service' is invalid because Property references a Controller Service that does not exist\n" +
                 "'%s' validated against 'not-enum2' is invalid because Given value not found in allowed set '%s'\n" +
                 "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
+                "'%s' validated against 'not-boolean' is invalid because Given value not found in allowed set 'true, false'\n" +
                 "'%s' validated against 'not-a-service' is invalid because Invalid Controller Service: not-a-service is not a valid Controller Service Identifier\n",
                 AbstractJsonQueryElasticsearch.QUERY.getName(), AbstractJsonQueryElasticsearch.QUERY.getName(),
                 AbstractJsonQueryElasticsearch.INDEX.getName(), AbstractJsonQueryElasticsearch.INDEX.getName(),
@@ -103,6 +106,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
                 AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
                 AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
                 AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
+                AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(),
                 AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
         )))
     }
@@ -147,6 +151,45 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
     }
 
     @Test
+    void testNoHits() throws Exception {
+        // test no hits (no output)
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setMaxPages(0)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false")
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 0, 0, 0)
+        assertThat(
+                runner.getProvenanceEvents().stream().filter({ pe ->
+                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                            pe.getAttribute("uuid") == hits.getAttribute("uuid")
+                }).count(),
+                is(0L)
+        )
+        reset(runner)
+
+
+        // test not hits (with output)
+        runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "0")
+                    assertOutputContent(hit.getContent(), 0, false)
+                    assertThat(
+                            runner.getProvenanceEvents().stream().filter({ pe ->
+                                pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                        pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                            }).count(),
+                            is(1L)
+                    )
+                }
+        )
+    }
+
+    @Test
     void testAggregations() throws Exception {
         String query = prettyPrint(toJson([
                 query: [ match_all: [:] ],
@@ -302,7 +345,9 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         if (ndjson) {
             assertThat(content.split("\n").length, is(count))
         } else {
-            if (count == 1) {
+            if (count == 0) {
+                assertThat(content, is(""))
+            } else if (count == 1) {
                 assertThat(content.startsWith("{") && content.endsWith("}"), is(true))
             } else {
                 assertThat(content.startsWith("[") && content.endsWith("]"), is(true))