You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "r-vandenbos (via GitHub)" <gi...@apache.org> on 2023/04/11 15:08:00 UTC

[GitHub] [nifi] r-vandenbos opened a new pull request, #7163: Nifi 11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

r-vandenbos opened a new pull request, #7163:
URL: https://github.com/apache/nifi/pull/7163

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-11430](https://issues.apache.org/jira/browse/NIFI-11430)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [x] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [x] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [x] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] r-vandenbos commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "r-vandenbos (via GitHub)" <gi...@apache.org>.
r-vandenbos commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1190413586


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
                               final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
                               final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
         paginatedJsonQueryParameters.incrementPageCount();
         attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
 
         if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
-            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
+
+            hits = formatHits(hits);
+            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -32,15 +32,15 @@ import static groovy.json.JsonOutput.toJson
 import static org.hamcrest.CoreMatchers.equalTo
 import static org.hamcrest.CoreMatchers.is
 import static org.hamcrest.MatcherAssert.assertThat
-import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.*

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {

Review Comment:
   👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] r-vandenbos commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "r-vandenbos (via GitHub)" <gi...@apache.org>.
r-vandenbos commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1190414030


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -265,46 +436,35 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                     is(1L)
             )
         } else {
-            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
         }
     }
 
     @Test
-    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+    void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
         runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
         service.setMaxPages(0)
 
-        // test that an empty flow file is produced for a per query setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
-        reset(runner)
+        for (PaginationType paginationType : PaginationType.values()) {

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -265,46 +436,35 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                     is(1L)
             )
         } else {
-            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
         }
     }
 
     @Test
-    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+    void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
         runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
         service.setMaxPages(0)
 
-        // test that an empty flow file is produced for a per query setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
-        reset(runner)
+        for (PaginationType paginationType : PaginationType.values()) {
+            runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
 
-        // test that an empty flow file is produced for a per hit setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+            for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {
+                runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
+
+                runOnce(runner)
+
+                // Test Relationship counts
+                testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+                // Per response outputs an array of values
+                if (resultOutputStrategy.equals(ResultOutputStrategy.PER_RESPONSE)) {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
+                            assertFalse(h.isEmpty())
+                            assertFalse(h.containsKey("_source"))
+                            assertFalse(h.containsKey("_index"))
+                            // should be the _source content only
+                            assertTrue(h.containsKey("msg"))
+
+                        })
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                } else {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                        assertFalse(h.isEmpty())
+                        assertFalse(h.containsKey("_source"))
+                        assertFalse(h.containsKey("_index"))
+                        // should be the _source content only
+                        assertTrue(h.containsKey("msg"))
+
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                }
+
+                reset(runner)
+            }
+        }
+    }
+
+    @Test
+    void testMetaExtraction() throws Exception {

Review Comment:
   👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] r-vandenbos commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "r-vandenbos (via GitHub)" <gi...@apache.org>.
r-vandenbos commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1190413790


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {
+                runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
+
+                runOnce(runner)
+
+                // Test Relationship counts
+                testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+                // Per response outputs an array of values
+                if (resultOutputStrategy.equals(ResultOutputStrategy.PER_RESPONSE)) {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
+                            assertFalse(h.isEmpty())
+                            assertFalse(h.containsKey("_source"))
+                            assertFalse(h.containsKey("_index"))
+                            // should be the _source content only
+                            assertTrue(h.containsKey("msg"))
+
+                        })
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                } else {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                        assertFalse(h.isEmpty())
+                        assertFalse(h.containsKey("_source"))
+                        assertFalse(h.containsKey("_index"))
+                        // should be the _source content only
+                        assertTrue(h.containsKey("msg"))
+
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                }
+
+                reset(runner)
+            }
+        }
+    }
+
+    @Test
+    void testMetaExtraction() throws Exception {
+
+        // Test against each result strategy type
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }

Review Comment:
   👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #7163: Nifi 11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on PR #7163:
URL: https://github.com/apache/nifi/pull/7163#issuecomment-1503603605

   Thanks for the contribution @r-vandenbos!
   
   One procedural note, it looks like the username on this pull request @r-vandenbos does not match the username on the Git commit, which is @vandenbos. In general, these usernames should match for proper attribution of both the PR and the commits. It should be possible to update the author on the commit to match.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] r-vandenbos commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "r-vandenbos (via GitHub)" <gi...@apache.org>.
r-vandenbos commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1190414178


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -61,6 +61,21 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
         reset(runner)
 
 
+        // NIFI-11430 test that 1 flow file is output when hits output per response with output empty flow file set to true
+        runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")

Review Comment:
   👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] r-vandenbos commented on pull request #7163: Nifi 11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "r-vandenbos (via GitHub)" <gi...@apache.org>.
r-vandenbos commented on PR #7163:
URL: https://github.com/apache/nifi/pull/7163#issuecomment-1504954127

   @exceptionfactory thanks for pointing this out, I got into a spot a trouble with local vs global settings.
   
   This has been fixed now 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query
URL: https://github.com/apache/nifi/pull/7163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ChrisSamo632 commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

Posted by "ChrisSamo632 (via GitHub)" <gi...@apache.org>.
ChrisSamo632 commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1163945827


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
                               final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
                               final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
         paginatedJsonQueryParameters.incrementPageCount();
         attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
 
         if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
-            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
+
+            hits = formatHits(hits);
+            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);

Review Comment:
   ```suggestion
               final List<Map<String, Object>> formattedHits = formatHits(hits);
               combineHits(formattedHits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);
   ```
   
   Prefer to leave the original input `hits` unchanged for clarity (same as done in `AbstractJsonQueryElasticsearch#handleHits`)



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -32,15 +32,15 @@ import static groovy.json.JsonOutput.toJson
 import static org.hamcrest.CoreMatchers.equalTo
 import static org.hamcrest.CoreMatchers.is
 import static org.hamcrest.MatcherAssert.assertThat
-import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.*

Review Comment:
   Don't use wildcard imports



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }

Review Comment:
   Should really have a `default` clause in a `switch` - would likely be a case of throwing an `IllegalArgumentException` or such here to indicate that we haven't got a test for the current `resultOutputStrategy` value (e.g. if a new one is added to the enum in future)



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {
+                runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
+
+                runOnce(runner)
+
+                // Test Relationship counts
+                testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+                // Per response outputs an array of values
+                if (resultOutputStrategy.equals(ResultOutputStrategy.PER_RESPONSE)) {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
+                            assertFalse(h.isEmpty())
+                            assertFalse(h.containsKey("_source"))
+                            assertFalse(h.containsKey("_index"))
+                            // should be the _source content only
+                            assertTrue(h.containsKey("msg"))
+
+                        })
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                } else {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                        assertFalse(h.isEmpty())
+                        assertFalse(h.containsKey("_source"))
+                        assertFalse(h.containsKey("_index"))
+                        // should be the _source content only
+                        assertTrue(h.containsKey("msg"))
+
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                }
+
+                reset(runner)
+            }
+        }
+    }
+
+    @Test
+    void testMetaExtraction() throws Exception {
+
+        // Test against each result strategy type
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {

Review Comment:
   ```suggestion
           for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
   ```



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -265,46 +436,35 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                     is(1L)
             )
         } else {
-            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
         }
     }
 
     @Test
-    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+    void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
         runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
         service.setMaxPages(0)
 
-        // test that an empty flow file is produced for a per query setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
-        reset(runner)
+        for (PaginationType paginationType : PaginationType.values()) {

Review Comment:
   ```suggestion
           for (final PaginationType paginationType : PaginationType.values()) {
   ```



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {
+                runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
+
+                runOnce(runner)
+
+                // Test Relationship counts
+                testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+                // Per response outputs an array of values
+                if (resultOutputStrategy.equals(ResultOutputStrategy.PER_RESPONSE)) {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
+                            assertFalse(h.isEmpty())
+                            assertFalse(h.containsKey("_source"))
+                            assertFalse(h.containsKey("_index"))
+                            // should be the _source content only
+                            assertTrue(h.containsKey("msg"))
+
+                        })
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                } else {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                        assertFalse(h.isEmpty())
+                        assertFalse(h.containsKey("_source"))
+                        assertFalse(h.containsKey("_index"))
+                        // should be the _source content only
+                        assertTrue(h.containsKey("msg"))
+
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                }
+
+                reset(runner)
+            }
+        }
+    }
+
+    @Test
+    void testMetaExtraction() throws Exception {

Review Comment:
   There's a lot of repetition between this and the `testSourceExtraction` above, would it be better to try combining them and reducing the repetetive code, e.g.
   
   ```java
   static void assertFormattedResult(final SearchResultsFormat searchResultsFormat, final Map<String, Object> hit) {
           assertFalse(hit.isEmpty())
           switch(searchResultsFormat) {
               case SearchResultsFormat.SOURCE_ONLY:
                   assertFalse(hit.containsKey("_source"))
                   assertFalse(hit.containsKey("_index"))
                   assertTrue(hit.containsKey("msg"))
                   break
               case SearchResultsFormat.METADATA_ONLY:
                   assertFalse(hit.containsKey("_source"))
                   assertTrue(hit.containsKey("_index"))
                   assertFalse(hit.containsKey("msg"))
                   break
               case SearchResultsFormat.FULL:
                   assertTrue(hit.containsKey("_source"))
                   assertTrue(hit.containsKey("_index"))
                   assertFalse(hit.containsKey("msg"))
                   break
               default:
                   throw new IllegalArgumentException("Unknown SearchResultsFormat value: " + searchResultsFormat.toString())
           }
       }
   
       private void assertResultsFormat(final TestRunner runner, final ResultOutputStrategy resultOutputStrategy, final SearchResultsFormat searchResultsFormat) {
           int flowFileCount
           String hitsCount
           boolean ndjson = false
   
           switch (resultOutputStrategy) {
               case ResultOutputStrategy.PER_QUERY:
                   flowFileCount = 1
                   hitsCount = "10"
                   ndjson = true
                   break
               case ResultOutputStrategy.PER_HIT:
                   flowFileCount = 10
                   hitsCount = "1"
                   break
               case ResultOutputStrategy.PER_RESPONSE:
                   flowFileCount = 1
                   hitsCount = "10"
                   break
               default:
                   throw new IllegalArgumentException("Unknown ResultOutputStrategy value: " + resultOutputStrategy.toString())
           }
   
           // Test Relationship counts
           testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
   
           // Per response outputs an array of values
           runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
               hit.assertAttributeEquals("hit.count", hitsCount)
               assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
               if (ResultOutputStrategy.PER_RESPONSE == resultOutputStrategy) {
                   OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
                       assertFormattedResult(searchResultsFormat, h as Map<String, Object>)
                   })
               } else {
                   final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
                   assertFormattedResult(searchResultsFormat, h)
               }
               assertThat(
                       runner.getProvenanceEvents().stream().filter({ pe ->
                           pe.getEventType() == ProvenanceEventType.RECEIVE &&
                                   pe.getAttribute("uuid") == hit.getAttribute("uuid")
                       }).count(),
                       is(1L)
               )
           })
       }
   
       @Test
       void testResultsFormat() throws Exception {
           for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
               final TestRunner runner = createRunner(false)
               runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
               runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
   
               // Test against each results format
               for (final SearchResultsFormat searchResultsFormat : SearchResultsFormat.values()) {
                   runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, searchResultsFormat.getValue())
   
                   // Test against each pagination type
                   for (final PaginationType paginationType : PaginationType.values()) {
                       runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
   
                       runOnce(runner)
                       assertResultsFormat(runner, resultOutputStrategy, searchResultsFormat)
                       reset(runner)
                   }
               }
           }
       }
   ```



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,

Review Comment:
   ```suggestion
       List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
   ```
   
   Better `hits` remains unchanged/`final`



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {

Review Comment:
   ```suggestion
               for (final PaginationType paginationType : PaginationType.values()) {
   ```



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -265,46 +436,35 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                     is(1L)
             )
         } else {
-            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
         }
     }
 
     @Test
-    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+    void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
         runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
         service.setMaxPages(0)
 
-        // test that an empty flow file is produced for a per query setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
-        reset(runner)
+        for (PaginationType paginationType : PaginationType.values()) {
+            runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
 
-        // test that an empty flow file is produced for a per hit setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+            for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {

Review Comment:
   ```suggestion
               for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
   ```



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {

Review Comment:
   ```suggestion
           for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
   ```
   
   (I like `final`s 🙂 )



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {
+                runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
+
+                runOnce(runner)
+
+                // Test Relationship counts
+                testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+                // Per response outputs an array of values
+                if (resultOutputStrategy.equals(ResultOutputStrategy.PER_RESPONSE)) {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
+                            assertFalse(h.isEmpty())
+                            assertFalse(h.containsKey("_source"))
+                            assertFalse(h.containsKey("_index"))
+                            // should be the _source content only
+                            assertTrue(h.containsKey("msg"))
+
+                        })
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                } else {
+                    runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+                        hit.assertAttributeEquals("hit.count", hitsCount)
+                        assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+                        final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                        assertFalse(h.isEmpty())
+                        assertFalse(h.containsKey("_source"))
+                        assertFalse(h.containsKey("_index"))
+                        // should be the _source content only
+                        assertTrue(h.containsKey("msg"))
+
+                        assertThat(
+                                runner.getProvenanceEvents().stream().filter({ pe ->
+                                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                            pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                                }).count(),
+                                is(1L)
+                        )
+                    })
+                }
+
+                reset(runner)
+            }
+        }
+    }
+
+    @Test
+    void testMetaExtraction() throws Exception {
+
+        // Test against each result strategy type
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }

Review Comment:
   Consider a `default` block, as above



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -61,6 +61,21 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
         reset(runner)
 
 
+        // NIFI-11430 test that 1 flow file is output when hits output per response with output empty flow file set to true
+        runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")

Review Comment:
   Once a property has been set for the processor on the `runner`, it doesn't get reset/changed unless explicitly modified by the test code - the existing tests were presumably using the default of not outputting `no_hits`, the new tests are setting this to `true` and leaving it that way with little/no other assertion changes (i.e. we basically run the exact same test and aseertions twice)
   
   Be sure to set `runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false")` for each of the existing sets of tests here before the `runOnce(runner)` calls
   
   Even better, consider refactoring to loop through the different values (`OUTPUT_NO_HITS`, but possibly `SEARCH_RESULTS_SPLIT` too), similar to how you've done it in `AbstractPaginatedJsonQueryElasticsearchTest.groovy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org