You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/28 16:16:36 UTC

[GitHub] [nifi] ChrisSamo632 opened a new pull request #5193: NIFI-8002 Creation of a PaginatedJsonQueryElasticsearch and ConsumeEl…

ChrisSamo632 opened a new pull request #5193:
URL: https://github.com/apache/nifi/pull/5193


   …asticsearch processors
   
   Integration Tests for ES Client Service with version and flavour-specific tests for applicable functionality
   
   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - ~[ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?~
   - ~[ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?~
   - ~[ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?~
   - [x] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - ~[ ] Have you ensured that format looks appropriate for the output in which it is rendered?~
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676494882



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       See my [response above](https://github.com/apache/nifi/pull/5193#discussion_r676401159) for your `ConsumeElasticseach` processor name comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676406392



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       I run:
   
   ```bash
   mvnd -P integration-tests clean verify
   sleep 2s
   mvnd -P integration-tests,integration-6 clean verify
   sleep 2s
   mvnd -P integration-tests-default,integration-7 clean verify
   ```
   
   for:
   * ES 5
   * ES-6
   * ES-7 (includes x-pack with the `-default` ES artefact for testing the "Point in Time" functionality)
   
   Note that you need to `clean` for each run so that the Elasticsearch instance directory is deleted by Maven, otherwise it gets itself in a knot because of mixed libraries, etc. (e.g. trying to start an ES6 instance with ES5 libraries still in the `target` directory)
   
   I'll add something into a README of the `pom.xml` to help future developers run the same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676401159



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Correct - I wasn't really sure what to call this processor, but I'm also not convinced about the use of `Consume`
   
   Originally I combined this with the new `Paginated` processor, but the logic in the methods to handle the "with input" vs. "no input" was starting to look difficult for a future maintainer to understand, so I felt separating the two made sense - the code is cleaner and it's more obvious what's going on with the two different processors... just need to figure a better name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r677730099



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -116,7 +117,7 @@ default String getQuery(FlowFile input, ProcessContext context, ProcessSession s
             session.exportTo(input, out);
             out.close();
 
-            retVal = out.toString();
+            retVal = out.toString(StandardCharsets.UTF_8);

Review comment:
       Rings a bell from when I changed this originally actually... forgot to change the module settings in IntelliJ to check for JDK 8 compatability, d'oh.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676386332



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;

Review comment:
       "Least Privilege" principle - all uses of the field (and methods, et.c you've asked this about elsewhere) are used only within the declaring class or other classes within the same package (either in the `src` or `test`), i.e. there are no classes outside of the package that need to inherrit/use the declared items through extension (protected) or reference (public); therefore package-private (i.e. default) is the most appropriate thing to use because it grants the required access while restricting use as much as possible
   
   If such things are needed outside of the current package structure in future then they should be updated as appropriate by developers of future PRs as required




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-886595976


   > > Are there any features you can think of in ScrollElasticsearchHttp and QueryElasticsearchHttp that aren't covered by these new processors? I'm considering marking them as deprecated in favor of these ones.
   > 
   > I tried to cover everything I could see functionality wise, so hopefully there's nothing missed out. I decided not to go so far as marking the older processors as deprecated - should we do that here or in a separate PR do you think @gresockj?
   
   We should do it in a separate PR, as part of the 2.0 efforts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676486405



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + "/_pit", params);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Review comment:
       If you name it DEFAULT_CHARSET, it could add some semantic value here. Since it's only used twice, perhaps it's overkill, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676474863



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();

Review comment:
       As for `ObjectReader` above, the `ObjectWriter` first needs an `ObjectMapper` to be available
   
   The `writer` in `AbstractPaginatedJsonQueryElasticsearch` is for newline-delimited output only; the `mapper` from `AbstractJsonQueryElasticsearch` is used for output of "JSON Array" format but also parsing the input Query JSON and Search After parameters... i.e. we need both read and write capability between the two classes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676499452



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
##########
@@ -1,48 +0,0 @@
-<!DOCTYPE html>

Review comment:
       Haha, oh.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-886595057


   > Are there any features you can think of in ScrollElasticsearchHttp and QueryElasticsearchHttp that aren't covered by these new processors? I'm considering marking them as deprecated in favor of these ones.
   
   I tried to cover everything I could see functionality wise, so hopefully there's nothing missed out. I decided not to go so far as marking the older processors as deprecated - should we do that here or in a separate PR do you think @gresockj?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676751424



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Struggling to think of an alternative to `Consume`, suggestions welcome...
   
   Another reason to keep the processors separate is the use of `@TriggerSerially` on `Consume` - this logic can be added to the `onTrigger` method, but that's a bit dirty and confusing for maintainers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676404004



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);

Review comment:
       Makes sense to return the Elasticsearch warnings in the SearchResponse I think, good shout - I'll look at how much of a change that is and either include it or raise a separate Jira ticket (then probably the PR to match)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676862387



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       I'll leave this functionality out of this PR then, if wanted in future, maybe it should be a follow-on ticket?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675467678



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
##########
@@ -191,9 +191,45 @@
      * @param query A JSON string reprensenting the query.
      * @param index The index to target. Optional.
      * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
+     * @param requestParameters A collection of URL request parameters. Optional.
      * @return A SearchResponse object if successful.
      */
-    SearchResponse search(String query, String index, String type);
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters);
+
+    /**
+     * Retrieve next page of results from a Scroll.
+     *
+     * @param scroll Scroll body containing scrollId and optional scroll (keep alive) retention period.

Review comment:
       To be consistent with the documentation on line 191, what about mentioning that the scroll body should be JSON?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676403430



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {

Review comment:
       I like `final`s everywhere, surprised I miss them (maybe I left them off if they weren't in the original classes, but I've refactored things so much now that updating this would make sense)... will update to use `final` throughout where I can




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676785700



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Ok, that makes more sense, as it matches the scroll duration concept in Elasticsearch.  What was confusing me was this line in `AbstractPaginatedJSONQueryElasticsearch:
   `paginatedJsonQueryParameters.setExpirationTimestamp(
                   String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
           );`
   which made me think the keep alive was intended to set the expiration of the query.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676737715



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       So one problem with this... the input FlowFile (where it's used in a processor) will need to be transferred to the `original` output with the *first* batch so that the Session can be committed
   
   This *may* be a problem for anyone that's using a Wait/Notify kind of setup - if they want to batch the result outputs but then wait for all hits/pages of results to be processed before continuing with another part of their Flow, they're going to need to know how many hits/pages to expect beforehand and then wait for those *instead* of simply waiting for the `original` FlowFile to appear... is this a problem? Should we only have batching where there's not `input` FlowFile (sounds like a complex processor configuration then)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r689363718



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       [NIFI-9050](https://issues.apache.org/jira/browse/NIFI-9050) raised to cover a possible issue discovered with using the `ProcessSession#*State` methods in place of `ProcessContext#getStateManager#*State` (introduced via NIFI-8136).
   
   I think this was a timing issue with using the ProcessSession State methods - AbstractProcessor uses `commitAsync` (not `commit`) and the quick (1 sec) sceduling of the processor used here meant that the State hadn't actually been persisted and thus the processor was doing the wrong thing when determining which page of data to try and obtain, etc.
   
   Also, there was no need to log an ERROR/WARNING for the fact that a {{deleteScroll}}/{{deletePointInTime}} function had been called with an Id that no longer existed in Elasticsearch (e.g. after an old query had expired then the processor restarted). In fact there's no need to try and delete should resources from Elasticsearch because it does that itself automatically, instead we should just delete them after we know there's no more data to be pulled from Elasticsearch (but the Scroll/PiT are still active within Elasticsearch).
   
   
   ... changes made and pushed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675490649



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
##########
@@ -191,9 +191,45 @@
      * @param query A JSON string reprensenting the query.
      * @param index The index to target. Optional.
      * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
+     * @param requestParameters A collection of URL request parameters. Optional.
      * @return A SearchResponse object if successful.
      */
-    SearchResponse search(String query, String index, String type);
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters);
+
+    /**
+     * Retrieve next page of results from a Scroll.
+     *
+     * @param scroll Scroll body containing scrollId and optional scroll (keep alive) retention period.

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676665018



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       Actually, I've just found that the `default` flavour of elasticsearch 7 wasn't running the tests (it spins up Elasticsearch but then doesn't run the `integration-tests`) so I've changed this to:
   ```bash
   mvnd -P integration-tests,elasticsearch-oss clean verify
   sleep 2s
   mvnd -P integration-tests,elasticsearch-oss,elasticsearch-6 clean verify
   sleep 2s
   mvnd -P integration-tests,elasticsearch-oss,integration-7 clean verify
   sleep 2s
   mvnd -P integration-tests,elasticsearch-default,integration-7 clean verify
   ```
   
   added in running ES 7 without X-Pack.
   
   Updated the new README to include these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676875186



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Better than nothing and or leaving it as `Consume`,so I'll make the change and we can come back to it later it needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676874673



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       The `Page Keep Alive` property tries to describe this already, but I'll add something to the `ConsumeElasticsearch` main docs too to try and make it clearer for users of the processor
   
   Also renamed the parameter to `pageExpirationTimestamp` to try and make it a bit clearer in the code
   
   Investigation for the state issue ongoing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676782474



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       Yeah, good call with the `original` issue.  Perhaps if the query was expected to return that many results, you would naturally switch to using `ConsumeElasticsearch`, since successive pages can be run in separate processor runs.  Another good use case for the two separate processors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676785700



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Ok, that makes more sense, as it matches the scroll duration concept in Elasticsearch.  What was confusing me was this line in `AbstractPaginatedJSONQueryElasticsearch`:
   ```
   paginatedJsonQueryParameters.setExpirationTimestamp(
                   String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
           );
   ```
   which made me think the keep alive was intended to set the expiration of the query.
   
   Yes, I'd definitely mention what the intended expiration behavior should be for `ConsumeElasticsearch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676750457



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Need to re-create this to investigate - I can't currently see why the state wouldn't be reset, but there must be some logic wrong somewhere. Absolutely there should be any ERRORs output for a successful pass through the processor, so will investigate that
   
   Note that "Keep Alive" is the time allowed by Elasticsearch *between* page retrievals **not** the overall time of the scroll/searchAfter/PIT query. So the processor running longer than the "Keep Alive" isn't an issue (and should be entirely possible) - the question is what should happen if the Elasticsearch paginated query expires in between page retrievals (for which I think I've got a unit test in `ConsumeElasticsearch#testPaginationExpiration` - the query restarts by retrieving the first page, then should continue through any remaining pages, etc.). Does this need to be made clearer in some processor documentation maybe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-869955539


   Impacts https://github.com/apache/nifi/pull/4693 (NIFI-8001), which uses the same request parameters approach in the ElasticsearchClientService to allow provision of arbitrary query parameters. This PR should be rebased if the other is merged first (or vice versa).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5193:
URL: https://github.com/apache/nifi/pull/5193


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676386467



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;

Review comment:
       (as above)

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,

Review comment:
       (as above)

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
+            "splitUp-combine",
+            "Combine",
+            "Combine results from all responses (one flowfile per entire paginated result set of hits). " +
+                    "Note that aggregations cannot be paged, they are generated across the entire result set and " +
+                    "returned as part of the first page. Results are output with one JSON object per line " +
+                    "(allowing hits to be combined from multiple pages without loading all results into memory)."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS)
+            .description("Split up search results into one flowfile per result or combine all hits from all pages into a single flowfile.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE)
+            .build();
+
+    public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
+            "pagination-search_after",
+            "Search After",
+            "Use Elasticsearch \"search_after\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
+            "pagination-pit",
+            "Point in Time",
+            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+            "pagination-scroll",
+            "Scroll",
+            "Use Elasticsearch \"scroll\" to page results."
+    );
+
+    public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-type")
+            .displayName("Pagination Type")
+            .description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
+                    "check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
+            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
+            .defaultValue(PAGINATION_SCROLL.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-keep-alive")
+            .displayName("Pagination Keep Alive")
+            .description("Pagination \"keep_alive\" period. Period Elasticsearch will keep the scroll/pit cursor alive " +
+                    "in between requests (this is not the time expected for all pages to be returned, but the maximum " +
+                    "allowed time between requests for page retrieval).")
+            .required(true)
+            .defaultValue("10 mins")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .build();
+
+    static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+        descriptors.add(PAGINATION_TYPE);
+        descriptors.add(PAGINATION_KEEP_ALIVE);
+
+        paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    // output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
+    private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
+
+    String paginationType;

Review comment:
       (as above)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676493528



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ConsumeElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster in a paginated manner.

Review comment:
       Copied from the existing `JsonQueryElasticsearch` docs... but I agree - I didn't like it when I saw this initially, but forgot to change anything




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r677530656



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -116,7 +117,7 @@ default String getQuery(FlowFile input, ProcessContext context, ProcessSession s
             session.exportTo(input, out);
             out.close();
 
-            retVal = out.toString();
+            retVal = out.toString(StandardCharsets.UTF_8);

Review comment:
       Whoops, looks like toString(Charset) was introduced in Java 10.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-885136052


   Hi @ChrisSamo632, I'd like to test this one out.  Can you do me a favor and rebase against main so we get the 1.15.0-SNAPSHOT version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 removed a comment on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 removed a comment on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-869955539


   Impacts https://github.com/apache/nifi/pull/4693 (NIFI-8001), which uses the same request parameters approach in the ElasticsearchClientService to allow provision of arbitrary query parameters. This PR should be rebased if the other is merged first (or vice versa).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676406392



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       I run:
   
   ```bash
   mvnd -P integration-tests clean verify
   sleep 2s
   mvnd -P integration-tests,integration-6 clean verify
   sleep 2s
   mvnd -P integration-tests,integration-7 clean verify
   ```
   
   for:
   * ES 5
   * ES-6
   * ES-7 (includes x-pack with the `-default` ES artefact for testing the "Point in Time" functionality)
   
   Note that you need to `clean` for each run so that the Elasticsearch instance directory is deleted by Maven, otherwise it gets itself in a knot because of mixed libraries, etc. (e.g. trying to start an ES6 instance with ES5 libraries still in the `target` directory)
   
   I'll add something into a README of the `pom.xml` to help future developers run the same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676863192



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       Agreed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676402146



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -111,11 +112,11 @@ default String getQuery(FlowFile input, ProcessContext context, ProcessSession s
         if (context.getProperty(QUERY).isSet()) {
             retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
         } else if (input != null) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
             session.exportTo(input, out);
             out.close();
 
-            retVal = new String(out.toByteArray());
+            retVal = out.toString();

Review comment:
       I considered this but wasn't sure whether it's best to leave it using the system default (in case someone is using a non-UTF8 Charset) - maybe the best thing really would be to allow users to specify an output (and input?) Charset, but that seems like it should be a separate Jira/PR
   
   I'll look again at this and consider whether UTF8 here would make sense for now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676490484



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
##########
@@ -1,48 +0,0 @@
-<!DOCTYPE html>

Review comment:
       There is no such processor called `PutElasticsearchJson`, so this file made no sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676518459



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       Probably makes sense for:
   * outputting a FlowFile per hit
   * outputting a FlowFile per Response (page) when using the Paginated Processors
   
   So will look to do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675468689



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
##########
@@ -25,11 +25,11 @@
  * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
  */
 public class IndexOperationRequest {
-    private String index;
-    private String type;
-    private String id;
-    private Map<String, Object> fields;
-    private Operation operation;
+    private final String index;
+    private final String type;
+    private final String id;
+    private final Map<String, Object> fields;
+    private final Operation operation;
 
     public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {

Review comment:
       Perhaps also a good time to make these parameters final.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -165,10 +165,12 @@ private void setupClient(ConfigurationContext context) throws MalformedURLExcept
         this.client = builder.build();
     }
 
-    private Response runQuery(String endpoint, String query, String index, String type) {
-        StringBuilder sb = new StringBuilder()
-            .append("/").append(index);
-        if (type != null && !type.equals("")) {
+    private Response runQuery(String endpoint, String query, String index, String type, Map<String, String> requestParameters) {

Review comment:
       Consider final parameters and StringBuilder here.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       Can you provide some examples of correctly running the integration tests?
   
   When running `mvn clean install -Pintegration-tests -Pintegration-6` and `mvn clean install -Pintegration-test -Pintegration-7`, I get the following error:
   ```
   [ERROR] Failed to execute goal com.github.alexcojocaru:elasticsearch-maven-plugin:6.19:runforked (start-elasticsearch) on project nifi-elasticsearch-client-service: Elasticsearch [0]: Cannot execute command '[bin/elasticsearch-plugin, list]' in directory '/Users/jgresock/workspace/nifi.PR/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/target/elasticsearch0'
   [ERROR] Output:
   [ERROR]
   [ERROR] Error:
   [ERROR] Exception in thread "main" java.lang.NoSuchMethodError: org.elasticsearch.cli.MultiCommand.<init>(Ljava/lang/String;)V
   [ERROR] 	at org.elasticsearch.plugins.PluginCli.<init>(PluginCli.java:39)
   [ERROR] 	at org.elasticsearch.plugins.PluginCli.main(PluginCli.java:47): Process exited with an error: 1 (Exit value: 1)
   ```
   Not sure if this is user error, but I wanted to bring it up.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       What do you think about adding a BATCH_SIZE property similar to `ListS3`, which would commit the session after the configured batch size?  My concern is that on very large result sets, this processor could be running for a long time without emitting any flow files.  If BATCH_SIZE is unset, you could use the existing behavior.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;

Review comment:
       Why not protected here?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;

Review comment:
       Is there a reason you chose package scope instead of protected for clientService and slpitUpHits?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ConsumeElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster in a paginated manner.

Review comment:
       Since the query doesn't have to be from Kibana, what about "be able to take a JSON query (e.g., from Kibana) and execute it..."?
   

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", "json"})
+@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.addAll(paginatedPropertyDescriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedQueryJsonParameters,

Review comment:
       Consider protected methods here

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();

Review comment:
       Also, I just noticed that you have an ObjectWriter in `AbstractPaginatedJsonQueryElasticsearch` -- any way you can combine these?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PaginatedJsonQueryElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster in a paginated manner.
+    Like all processors in the "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
+    <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
+    the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query
+    property. If no Query property is provided and a flowfile is received, the content of the flowfile is used as the Query JSON.</p>

Review comment:
       This seems unclear to me based on the actual functionality.  How about:
   ```suggestion
       <p>The query JSON to execute can be provided either in the Query configuration property or in the content of the 
       flowfile.  If the Query Attribute property is configured, the executed query JSON will be placed in the attribute provided by this property.</p>
   ```

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {

Review comment:
       Consider final for parameters

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + "/_pit", params);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Review comment:
       Consider declaring a Charset constant for UTF-8, since it's used in multiple places here.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);

Review comment:
       One thing I noticed during testing was that if I provided a 'type' value in Elasticsearch 7, the query completes with 0 results.  I understand that this would be a user error, but I also noticed the following in the logs:
   ```
   2021-07-23 06:22:59,910 WARN [I/O dispatcher 1] org.elasticsearch.client.RestClient request [POST http://localhost:9200/my_index/test/_search?scroll=600s] returned 2 warnings: [299 Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security."],[299 Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "[types removal] Specifying types in search requests is deprecated."]
   2021-07-23 06:22:59,919 WARN [I/O dispatcher 1] org.elasticsearch.client.RestClient request [DELETE http://localhost:9200/_search/scroll] returned 1 warnings: [299 Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security."]
   ```
   It appears these warnings are pulled from the HTTP Response Headers, which are available in the Response object, so if you wanted, you could add these warnings to the SearchResponse in order to propagate the warnings to the processor's logger.  On the other hand, the user could view the nifi-app.log to find this warning.
   
   This PR is already quite substantial, so I'll leave it up to you as to whether to include this here (or not), or whether it could be covered in a future JIRA issue.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -111,11 +112,11 @@ default String getQuery(FlowFile input, ProcessContext context, ProcessSession s
         if (context.getProperty(QUERY).isSet()) {
             retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
         } else if (input != null) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
             session.exportTo(input, out);
             out.close();
 
-            retVal = new String(out.toByteArray());
+            retVal = out.toString();

Review comment:
       What do you think about passing the UTF-8 charset into toString()?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
##########
@@ -40,7 +40,8 @@ public boolean hasErrors() {
         return hasErrors;
     }
 
-    public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
+    @SuppressWarnings("unchecked")
+    public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
         ObjectMapper mapper = new ObjectMapper();

Review comment:
       I realize this wasn't part of your PR, but it could be a good time to improve the usage of ObjectMapper here.  I feel like this class could store a static final ObjectReader, since creating an ObjectMapper is a (relatively) expensive operation.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {

Review comment:
       Consider final for parameters

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();

Review comment:
       For clarity, since you only use write methods, what about making this the ObjectWriter?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {

Review comment:
       Not that it adds a huge amount of value here, but consider declaring the exception as final.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(

Review comment:
       The "Split up..." properties are the only part that doesn't feel natural to me.  What about the following scheme:
   **Search Results Split** property, allowable values:
   1. Flow file per result
   2. Flow file per page
   3. Flow file per query
   
   **Aggregation Results Split** property, allowable values:
   1. Flow file per result
   2. Flow file per query
   
   ... or some variant thereof.  I had to run it through the processor with each type to figure out the difference between Split = No vs. Split = Combine.
   
   Edit: I understand now that these properties already existed in JsonQueryElasticsearch and you're just trying to reuse them.  I wonder if it would be better to make the properties in your new processors more intuitive rather than tying them to JsonQueryElasticsearch.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,

Review comment:
       Same question for all these methods

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       I found what looks like a bug where the state is not actually cleared when the expirationTimestamp indicates.  I configured a ConsumeElasticsearch processor with a Keep Alive time of 30 seconds and Run Schedule of 1 second, and used a query that returned 15 pages of results (Split search/aggregation set to No).  Using the Scroll setting, this ran 15 times, and on the 16th time, it started indicating 404 Errors.  Then the 30 seconds expired, and the processor started producing results again, by emitting Page 1 one time for each run, over and over. I noted that the state didn't appear to have been cleared, so pageCount was stuck at 15, expirationTimestamp remained static, etc.  Same thing happened for PIT and SearchAfter, though SearchAfter didn't output any errors once the result set was complete.
   
   Beyond the state clearing, we should consider how to handle the case where the result set completes, since we probably don't want to log an ERROR in what should be considered a successful case.
   
   Also, what is the intended behavior if the query runs longer than the Keep Alive time?  Should the processor start over on the query, or continue with the result set until the last page is reached, and then start over?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       What would you consider the compelling reason to have this processor in addition to PaginatedJsonQueryElasticsearch?  The differences I see are the convenience of having it be a source processor (whereas you'd have to put a GenerateFlowFile processor in front of PaginatedJsonQueryElasticsearch to achieve the same thing), and there is a query expiration for ConsumeElasticsearch.  I'm just wondering if the two processors could be combined to serve the same purpose.

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
+                                    final FlowFile input, final StopWatch stopWatch) throws IOException;
+
+    abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session,
+                              final SearchResponse response) throws IOException;
+
+    FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
+        return parent != null ? session.create(parent) : session.create();
+    }
+
+    private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
+                                                      final ProcessSession session, final FlowFile aggFlowFile,
+                                                      final Map<String, String> attributes) {
+        FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
+        ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+            if (name != null) {
+                put("aggregation.name", name);
+            }
+            if (number != null) {
+                put("aggregation.number", number.toString());
+            }
+        }});
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
+                                    final FlowFile parent, final Map<String, String> attributes,
+                                    final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (aggregations != null && !aggregations.isEmpty()) {
+            final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+            if (splitUpAggregations.equals(SPLIT_UP_YES.getValue())) {
+                int aggCount = 0;
+                for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+                    final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                    final String aggJson = mapper.writeValueAsString(agg.getValue());
+                    aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
+                }
+            } else {
+                final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(aggregations);
+                aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
+            }
+
+            if (!aggsFlowFiles.isEmpty()) {
+                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+                aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            }
+        }
+    }
+
+    private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
+                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
+        final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
+        attributes.put("hit.count", Integer.toString(count));
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
+                              final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
+                              final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (hits != null && !hits.isEmpty()) {
+            if (SPLIT_UP_YES.getValue().equals(splitUpHits)) {
+                for (final Map<String, Object> hit : hits) {
+                    final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                    final String json = mapper.writeValueAsString(hit);
+                    hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
+                }
+            } else {
+                final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(hits);
+                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+            }
+        }
+
+        // output any results
+        if (!hitsFlowFiles.isEmpty()) {

Review comment:
       It looks like this method will always return an empty list, then?  Perhaps some method comments to indicate the purpose of the returned `List<FlowFile>`.  Can you walk me through the logic of returning anything if it will always be empty?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
+            "splitUp-combine",
+            "Combine",
+            "Combine results from all responses (one flowfile per entire paginated result set of hits). " +
+                    "Note that aggregations cannot be paged, they are generated across the entire result set and " +
+                    "returned as part of the first page. Results are output with one JSON object per line " +
+                    "(allowing hits to be combined from multiple pages without loading all results into memory)."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS)
+            .description("Split up search results into one flowfile per result or combine all hits from all pages into a single flowfile.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE)
+            .build();
+
+    public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
+            "pagination-search_after",
+            "Search After",
+            "Use Elasticsearch \"search_after\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
+            "pagination-pit",
+            "Point in Time",
+            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+            "pagination-scroll",
+            "Scroll",
+            "Use Elasticsearch \"scroll\" to page results."
+    );
+
+    public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-type")
+            .displayName("Pagination Type")
+            .description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
+                    "check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
+            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
+            .defaultValue(PAGINATION_SCROLL.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-keep-alive")
+            .displayName("Pagination Keep Alive")
+            .description("Pagination \"keep_alive\" period. Period Elasticsearch will keep the scroll/pit cursor alive " +
+                    "in between requests (this is not the time expected for all pages to be returned, but the maximum " +
+                    "allowed time between requests for page retrieval).")
+            .required(true)
+            .defaultValue("10 mins")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .build();
+
+    static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+        descriptors.add(PAGINATION_TYPE);
+        descriptors.add(PAGINATION_KEEP_ALIVE);
+
+        paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    // output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
+    private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
+
+    String paginationType;

Review comment:
       Could this be protected?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
##########
@@ -1,64 +0,0 @@
-<!DOCTYPE html>

Review comment:
       Is there a reason this was removed?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       A comment about naming: the name `ConsumeElasticsearch` invokes the image of consuming a queue or topic, but this may be misleading since that is not what would actually happen with Elasticsearch, as nothing actually disappears (is consumed) when the results are processed.  Instead, this is more of a standing query, right?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
##########
@@ -1,48 +0,0 @@
-<!DOCTYPE html>

Review comment:
       Is there a reason this was removed?

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PaginatedJsonQueryElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster in a paginated manner.

Review comment:
       Same comment from above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676471395



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
##########
@@ -40,7 +40,8 @@ public boolean hasErrors() {
         return hasErrors;
     }
 
-    public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
+    @SuppressWarnings("unchecked")
+    public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
         ObjectMapper mapper = new ObjectMapper();

Review comment:
       Use of an `ObjectReader` would require an `ObjectMapper` to be created first (the `ObjectReader` constructors need either an `ObjectMapper` or `ObjectReader`)... but absolutely, this should be a class-level constant, so that's easy to do

##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
##########
@@ -40,7 +40,8 @@ public boolean hasErrors() {
         return hasErrors;
     }
 
-    public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
+    @SuppressWarnings("unchecked")
+    public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
         ObjectMapper mapper = new ObjectMapper();

Review comment:
       Use of an `ObjectReader` would require an `ObjectMapper` to be created first (the `ObjectReader` constructors need either an `ObjectMapper` or `ObjectReader`)... but absolutely, this `ObjectMapper` should be a class-level constant, so that's easy to do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676665018



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       Actually, I've just found that the `default` flavour of elasticsearch 7 wasn't running the tests (it spins up Elasticsearch but then doesn't run the `integration-tests`) so I've changed this to:
   ```bash
   mvnd -P integration-tests,elasticsearch-oss clean verify
   sleep 2s
   mvnd -P integration-tests,elasticsearch-oss,elasticsearch-6 clean verify
   sleep 2s
   mvnd -P integration-tests,,elasticsearch-oss,integration-7 clean verify
   sleep 2s
   mvnd -P integration-tests,,elasticsearch-default,integration-7 clean verify
   ```
   
   added in running ES 7 without X-Pack.
   
   Updated the new README to include these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676386896



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", "json"})
+@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.addAll(paginatedPropertyDescriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedQueryJsonParameters,

Review comment:
       (as above)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676743667



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       And, actually, once the input FlowFile has been transferred in the first batch, then it can't be used to create any further result FlowFile for output either
   
   So, basically, I don't think this will work for instances where the processor accepts an incoming FlowFile for the query (unless you know of a way of achieving that) - do we want to retain this functionality at all?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-885191515


   > Hi @ChrisSamo632, I'd like to test this one out. Can you do me a favor and rebase against main so we get the 1.15.0-SNAPSHOT version?
   
   @gresockj done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r693117718



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Great updates, this worked perfectly for me.  Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676480488



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
+                                    final FlowFile input, final StopWatch stopWatch) throws IOException;
+
+    abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session,
+                              final SearchResponse response) throws IOException;
+
+    FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
+        return parent != null ? session.create(parent) : session.create();
+    }
+
+    private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
+                                                      final ProcessSession session, final FlowFile aggFlowFile,
+                                                      final Map<String, String> attributes) {
+        FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
+        ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+            if (name != null) {
+                put("aggregation.name", name);
+            }
+            if (number != null) {
+                put("aggregation.number", number.toString());
+            }
+        }});
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
+                                    final FlowFile parent, final Map<String, String> attributes,
+                                    final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (aggregations != null && !aggregations.isEmpty()) {
+            final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+            if (splitUpAggregations.equals(SPLIT_UP_YES.getValue())) {
+                int aggCount = 0;
+                for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+                    final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                    final String aggJson = mapper.writeValueAsString(agg.getValue());
+                    aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
+                }
+            } else {
+                final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(aggregations);
+                aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
+            }
+
+            if (!aggsFlowFiles.isEmpty()) {
+                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+                aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            }
+        }
+    }
+
+    private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
+                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
+        final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
+        attributes.put("hit.count", Integer.toString(count));
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
+                              final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
+                              final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (hits != null && !hits.isEmpty()) {
+            if (SPLIT_UP_YES.getValue().equals(splitUpHits)) {
+                for (final Map<String, Object> hit : hits) {
+                    final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                    final String json = mapper.writeValueAsString(hit);
+                    hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
+                }
+            } else {
+                final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(hits);
+                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+            }
+        }
+
+        // output any results
+        if (!hitsFlowFiles.isEmpty()) {

Review comment:
       It will always be empty for processing non-combined results of paginated queries.
   
   It's used for combining the hits between pages of results with the `List<FlowFile>` being returned by `handleHits` => `handleResponse` => `doQuery` to be sent back through that chain on the next iteration through `doQuery` so that the existing `FlowFile` can be reused to append new hits into.
   
   I agree this is a little confusing when reading the `AbstractJsonQueryElasticsearch#handleHits` method code on its own - maybe some comments here would be useful? Could alternatively consider refactoring the method into separate methods for the `combined` vs. `non-combined` result handling and not return anything for those cases - I'm not sure how much complexity that might add (or aleviate) as the whole chain of method calls will be impacted, so will give it a quick try and then resort to some comments otherwise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676492362



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
##########
@@ -1,64 +0,0 @@
-<!DOCTYPE html>

Review comment:
       File was in the wrong location - `additionalDetails.html` files should be under `src/main/resources/docs/...` *not* inside a `META-INF` directory
   
   The file exists in the correct location (only) in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676497662



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PaginatedJsonQueryElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster in a paginated manner.
+    Like all processors in the "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
+    <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
+    the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query
+    property. If no Query property is provided and a flowfile is received, the content of the flowfile is used as the Query JSON.</p>

Review comment:
       Again a bit of documentation copied from the existing `JsonQueryElasticsearch` processor docs, but it's not very clear and your suggestion reads better (updated both sets of docs and corrected a grammatical error in `ConsumeElasticsearch` docs)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676506704



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + "/_pit", params);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Review comment:
       Default would suggest either the JVM Default (which it isn't necessarily) or that it's configurable (which it isn't, at least currently), so I'd be tempted to leave as-is and, if necessary, consider the configurability of Character Sets as a whole in a separate Jira ticket/PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#issuecomment-869955539


   Impacts https://github.com/apache/nifi/pull/4693 (NIFI-8001), which uses the same request parameters approach in the ElasticsearchClientService to allow provision of arbitrary query parameters. This PR should be rebased if the other is merged first (or vice versa).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676384536



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + "/_pit", params);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Review comment:
       `StandardCharsets.UTF_8` is a constant, it didn't seem worth creating a new one to hold the same thing locally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676388321



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(

Review comment:
       I renamed these several times during development - I guess it doesn't matter too much so long as the original PropertyDescriptor#name fields don't change from those already declared by `JsonQueryElasticsearch` (otherwise I'd be breaking existing uses of that processor for people) - I'll have another look at a more appropriate setup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676497084



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
+                                    final FlowFile input, final StopWatch stopWatch) throws IOException;
+
+    abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session,
+                              final SearchResponse response) throws IOException;
+
+    FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
+        return parent != null ? session.create(parent) : session.create();
+    }
+
+    private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
+                                                      final ProcessSession session, final FlowFile aggFlowFile,
+                                                      final Map<String, String> attributes) {
+        FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
+        ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+            if (name != null) {
+                put("aggregation.name", name);
+            }
+            if (number != null) {
+                put("aggregation.number", number.toString());
+            }
+        }});
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
+                                    final FlowFile parent, final Map<String, String> attributes,
+                                    final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (aggregations != null && !aggregations.isEmpty()) {
+            final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+            if (splitUpAggregations.equals(SPLIT_UP_YES.getValue())) {
+                int aggCount = 0;
+                for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+                    final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                    final String aggJson = mapper.writeValueAsString(agg.getValue());
+                    aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
+                }
+            } else {
+                final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(aggregations);
+                aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
+            }
+
+            if (!aggsFlowFiles.isEmpty()) {
+                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+                aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            }
+        }
+    }
+
+    private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
+                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
+        final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
+        attributes.put("hit.count", Integer.toString(count));
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
+                              final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
+                              final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (hits != null && !hits.isEmpty()) {
+            if (SPLIT_UP_YES.getValue().equals(splitUpHits)) {
+                for (final Map<String, Object> hit : hits) {
+                    final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                    final String json = mapper.writeValueAsString(hit);
+                    hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
+                }
+            } else {
+                final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(hits);
+                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+            }
+        }
+
+        // output any results
+        if (!hitsFlowFiles.isEmpty()) {

Review comment:
       I think just method comments in both AbstractJsonQueryElasticsearch and AbstractPaginatedJsonQueryElasticsearch should suffice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675468366



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
##########
@@ -25,11 +25,11 @@
  * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
  */
 public class IndexOperationRequest {
-    private String index;
-    private String type;
-    private String id;
-    private Map<String, Object> fields;
-    private Operation operation;
+    private final String index;
+    private final String type;
+    private final String id;
+    private final Map<String, Object> fields;
+    private final Operation operation;
 
     public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {

Review comment:
       Perhaps also a good time to make these parameters final.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] gresockj commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676804024



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       Would `SearchElasticsearch` be too repetitive?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ChrisSamo632 commented on a change in pull request #5193: NIFI-8002 Elasticsearch paginated query processors

Posted by GitBox <gi...@apache.org>.
ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676492362



##########
File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
##########
@@ -1,64 +0,0 @@
-<!DOCTYPE html>

Review comment:
       File was in the wrong location - `additionalDetails.html` files should be under `src/main/resources/docs/...` *not* inside a `META-INF` directory




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org