You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "r-vandenbos (via GitHub)" <gi...@apache.org> on 2023/05/10 21:32:55 UTC

[GitHub] [nifi] r-vandenbos commented on a diff in pull request #7163: NIFI-11430 - Fixed empty file being produced when grouping by query and fixed _source and _meta extraction by query

r-vandenbos commented on code in PR #7163:
URL: https://github.com/apache/nifi/pull/7163#discussion_r1190413586


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
                               final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
                               final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
         paginatedJsonQueryParameters.incrementPageCount();
         attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
 
         if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
-            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
+
+            hits = formatHits(hits);
+            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java:
##########
@@ -264,14 +265,16 @@ private void combineHits(final List<Map<String, Object>> hits, final PaginatedJs
      * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
      */
     @Override
-    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+    List<FlowFile> handleHits(List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -32,15 +32,15 @@ import static groovy.json.JsonOutput.toJson
 import static org.hamcrest.CoreMatchers.equalTo
 import static org.hamcrest.CoreMatchers.is
 import static org.hamcrest.MatcherAssert.assertThat
-import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.*

Review Comment:
   👍🏻 



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy:
##########
@@ -117,13 +117,184 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+    @Test
+    void testSourceExtraction() throws Exception {
+        for (ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
+
+            int flowFileCount
+            String hitsCount
+            boolean ndjson = false
+
+            switch (resultOutputStrategy) {
+                case ResultOutputStrategy.PER_QUERY:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    ndjson = true
+                    break
+                case ResultOutputStrategy.PER_HIT:
+                    flowFileCount = 10
+                    hitsCount = "1"
+                    break
+                case ResultOutputStrategy.PER_RESPONSE:
+                    flowFileCount = 1
+                    hitsCount = "10"
+                    break
+            }
+
+            // test _source only format
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
+            runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+
+
+            // Test against each pagination type
+            for (PaginationType paginationType : PaginationType.values()) {

Review Comment:
   👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org