You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/10/05 01:34:21 UTC

[nifi] branch main updated: NIFI-8002: - Creation of a PaginatedJsonQueryElasticsearch and ConsumeElasticsearch processors - Integration Tests for ES Client Service with version and flavour-specific tests for applicable functionality

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a63bd5  NIFI-8002: - Creation of a PaginatedJsonQueryElasticsearch and ConsumeElasticsearch processors - Integration Tests for ES Client Service with version and flavour-specific tests for applicable functionality
3a63bd5 is described below

commit 3a63bd579e059763c21c0b8355d8686ec5c222d4
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Mon Oct 4 21:59:07 2021 +0100

    NIFI-8002:
    - Creation of a PaginatedJsonQueryElasticsearch and ConsumeElasticsearch processors
    - Integration Tests for ES Client Service with version and flavour-specific tests for applicable functionality
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5193.
---
 .github/PULL_REQUEST_TEMPLATE.md                   |   2 +-
 .../elasticsearch/DeleteOperationResponse.java     |  11 +-
 .../elasticsearch/ElasticSearchClientService.java  |  38 +-
 .../nifi/elasticsearch/ElasticsearchError.java     |   2 +-
 .../nifi/elasticsearch/IndexOperationRequest.java  |  23 +-
 .../nifi/elasticsearch/IndexOperationResponse.java |  29 +-
 .../apache/nifi/elasticsearch/SearchResponse.java  |  45 +-
 .../nifi-elasticsearch-client-service/README.md    |  60 +++
 .../nifi-elasticsearch-client-service/pom.xml      | 151 +++++-
 .../ElasticSearchClientServiceImpl.java            | 339 ++++++++----
 .../elasticsearch/ElasticSearchLookupService.java  |   4 +-
 .../nifi/elasticsearch/SearchResponseTest.groovy   |  15 +-
 .../ElasticSearchClientService_IT.groovy           | 566 +++++++++++++++++++++
 .../ElasticSearchLookupService_IT.groovy           |  32 +-
 .../TestElasticSearchClientService.groovy          |  24 +-
 .../nifi-elasticsearch-restapi-processors/pom.xml  |   8 +-
 .../AbstractJsonQueryElasticsearch.java            | 305 +++++++++++
 .../AbstractPaginatedJsonQueryElasticsearch.java   | 330 ++++++++++++
 .../elasticsearch/ElasticsearchRestProcessor.java  |   9 +-
 .../elasticsearch/JsonQueryElasticsearch.java      | 245 ++-------
 .../PaginatedJsonQueryElasticsearch.java           |  91 ++++
 .../elasticsearch/SearchElasticsearch.java         | 194 +++++++
 .../elasticsearch/api/JsonQueryParameters.java     |  70 +++
 .../api/PaginatedJsonQueryParameters.java          |  79 +++
 .../additionalDetails.html                         |  48 --
 .../additionalDetails.html                         |  64 ---
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../additionalDetails.html                         |   7 +-
 .../additionalDetails.html                         |  24 +-
 .../additionalDetails.html                         |  76 +++
 .../AbstractJsonQueryElasticsearchTest.groovy      | 333 ++++++++++++
 ...tractPaginatedJsonQueryElasticsearchTest.groovy | 271 ++++++++++
 .../DeleteByQueryElasticsearchTest.groovy          |   2 +-
 .../JsonQueryElasticsearchNoInputTest.groovy}      |  17 +-
 .../JsonQueryElasticsearchTest.groovy              | 237 +--------
 .../PaginatedJsonQueryElasticsearchTest.groovy     |  91 ++++
 .../elasticsearch/SearchElasticsearchTest.groovy   | 238 +++++++++
 .../TestElasticsearchClientService.groovy          |  75 ++-
 .../mock/AbstractMockElasticsearchClient.groovy    |  28 +-
 39 files changed, 3435 insertions(+), 750 deletions(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index f227552..ba5f049 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -38,7 +38,7 @@ to ensure the following steps have been taken:
 - [ ] Have you written or updated unit tests to verify your changes?
 - [ ] Have you verified that the full build is successful on JDK 8?
 - [ ] Have you verified that the full build is successful on JDK 11?
-- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
+- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
 - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
 - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
 - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
index 1fd83d7..13e2aa3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
@@ -18,13 +18,20 @@
 package org.apache.nifi.elasticsearch;
 
 public class DeleteOperationResponse {
-    private long took;
+    private final long took;
 
-    public DeleteOperationResponse(long took) {
+    public DeleteOperationResponse(final long took) {
         this.took = took;
     }
 
     public long getTook() {
         return took;
     }
+
+    @Override
+    public String toString() {
+        return "DeleteOperationResponse{" +
+                "took=" + took +
+                '}';
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 236d70c..d232f2b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -191,9 +191,45 @@ public interface ElasticSearchClientService extends ControllerService {
      * @param query A JSON string reprensenting the query.
      * @param index The index to target. Optional.
      * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
+     * @param requestParameters A collection of URL request parameters. Optional.
      * @return A SearchResponse object if successful.
      */
-    SearchResponse search(String query, String index, String type);
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters);
+
+    /**
+     * Retrieve next page of results from a Scroll.
+     *
+     * @param scroll A JSON string containing scrollId and optional scroll (keep alive) retention period.
+     * @return A SearchResponse object if successful.
+     */
+    SearchResponse scroll(String scroll);
+
+    /**
+     * Initialise a Point in Time for paginated queries.
+     * Requires Elasticsearch 7.10+ and XPack features.
+     *
+     * @param index Index targeted.
+     * @param keepAlive Point in Time's retention period (maximum time Elasticsearch will retain the PiT between requests). Optional.
+     * @return the Point in Time Id (pit_id)
+     */
+    String initialisePointInTime(String index, String keepAlive);
+
+    /**
+     * Delete a Point in Time.
+     * Requires Elasticsearch 7.10+ and XPack features.
+     *
+     * @param pitId Point in Time Id to be deleted.
+     * @return A DeleteOperationResponse object if successful.
+     */
+    DeleteOperationResponse deletePointInTime(String pitId);
+
+    /**
+     * Delete a Scroll.
+     *
+     * @param scrollId Scroll Id to be deleted.
+     * @return A DeleteOperationResponse object if successful.
+     */
+    DeleteOperationResponse deleteScroll(String scrollId);
 
     /**
      * Build a transit URL to use with the provenance reporter.
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
index 0973567..65e11ad 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
@@ -34,7 +34,7 @@ public class ElasticsearchError extends RuntimeException {
 
     protected boolean isElastic;
 
-    public ElasticsearchError(Exception ex) {
+    public ElasticsearchError(final Exception ex) {
         super(ex);
         final boolean isKnownException = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
         final boolean isServiceUnavailable = "ResponseException".equals(ex.getClass().getSimpleName())
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 5ab6632..7de8807 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -25,13 +25,13 @@ import java.util.Map;
  * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
  */
 public class IndexOperationRequest {
-    private String index;
-    private String type;
-    private String id;
-    private Map<String, Object> fields;
-    private Operation operation;
+    private final String index;
+    private final String type;
+    private final String id;
+    private final Map<String, Object> fields;
+    private final Operation operation;
 
-    public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {
+    public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields, final Operation operation) {
         this.index = index;
         this.type = type;
         this.id = id;
@@ -81,4 +81,15 @@ public class IndexOperationRequest {
                     .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
         }
     }
+
+    @Override
+    public String toString() {
+        return "IndexOperationRequest{" +
+                "index='" + index + '\'' +
+                ", type='" + type + '\'' +
+                ", id='" + id + '\'' +
+                ", fields=" + fields +
+                ", operation=" + operation +
+                '}';
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
index c001f69..8d19ca8 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
@@ -24,11 +24,13 @@ import java.util.List;
 import java.util.Map;
 
 public class IndexOperationResponse {
-    private long took;
+    private final long took;
     private boolean hasErrors;
     private List<Map<String, Object>> items;
 
-    public IndexOperationResponse(long took) {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public IndexOperationResponse(final long took) {
         this.took = took;
     }
 
@@ -40,14 +42,14 @@ public class IndexOperationResponse {
         return hasErrors;
     }
 
-    public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        Map<String, Object> parsedResponse = mapper.readValue(response, Map.class);
-        int took = (int) parsedResponse.get("took");
-        boolean hasErrors = (boolean) parsedResponse.get("errors");
-        List<Map<String, Object>> items = (List<Map<String, Object>>)parsedResponse.get("items");
+    @SuppressWarnings("unchecked")
+    public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
+        final Map<String, Object> parsedResponse = OBJECT_MAPPER.readValue(response, Map.class);
+        final int took = (int) parsedResponse.get("took");
+        final boolean hasErrors = (boolean) parsedResponse.get("errors");
+        final List<Map<String, Object>> items = (List<Map<String, Object>>)parsedResponse.get("items");
 
-        IndexOperationResponse retVal = new IndexOperationResponse(took);
+        final IndexOperationResponse retVal = new IndexOperationResponse(took);
         retVal.hasErrors = hasErrors;
         retVal.items = items;
 
@@ -57,4 +59,13 @@ public class IndexOperationResponse {
     public List<Map<String, Object>> getItems() {
         return items;
     }
+
+    @Override
+    public String toString() {
+        return "IndexOperationResponse{" +
+                "took=" + took +
+                ", hasErrors=" + hasErrors +
+                ", items=" + items +
+                '}';
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java
index 725ad41..e6bfb15 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java
@@ -21,19 +21,28 @@ import java.util.List;
 import java.util.Map;
 
 public class SearchResponse {
-    private List<Map<String, Object>> hits;
-    private Map<String, Object> aggregations;
-    private long numberOfHits;
-    private int took;
-    private boolean timedOut;
+    private final List<Map<String, Object>> hits;
+    private final Map<String, Object> aggregations;
+    private final long numberOfHits;
+    private final int took;
+    private final boolean timedOut;
+    private final String pitId;
+    private final String scrollId;
+    private final String searchAfter;
+    private final List<String> warnings;
 
-    public SearchResponse(List<Map<String, Object>> hits, Map<String, Object> aggregations,
-                          int numberOfHits, int took, boolean timedOut) {
+    public SearchResponse(final List<Map<String, Object>> hits, final Map<String, Object> aggregations, final String pitId,
+                          final String scrollId, final String searchAfter, final int numberOfHits, final int took, final boolean timedOut,
+                          final List<String> warnings) {
         this.hits = hits;
         this.aggregations = aggregations;
+        this.pitId = pitId;
+        this.scrollId = scrollId;
         this.numberOfHits = numberOfHits;
         this.took = took;
         this.timedOut = timedOut;
+        this.searchAfter = searchAfter;
+        this.warnings = warnings;
     }
 
     public Map<String, Object> getAggregations() {
@@ -44,6 +53,18 @@ public class SearchResponse {
         return hits;
     }
 
+    public String getPitId() {
+        return pitId;
+    }
+
+    public String getScrollId() {
+        return scrollId;
+    }
+
+    public String getSearchAfter() {
+        return searchAfter;
+    }
+
     public long getNumberOfHits() {
         return numberOfHits;
     }
@@ -56,12 +77,22 @@ public class SearchResponse {
         return took;
     }
 
+    public List<String> getWarnings() {
+        return this.warnings;
+    }
+
     @Override
     public String toString() {
         return "SearchResponse{" +
                 "hits=" + hits +
                 ", aggregations=" + aggregations +
                 ", numberOfHits=" + numberOfHits +
+                ", took=" + took +
+                ", timedOut=" + timedOut +
+                ", pitId='" + pitId + '\'' +
+                ", scrollId='" + scrollId + '\'' +
+                ", searchAfter='" + searchAfter + '\'' +
+                ", warnings='" + warnings + '\'' +
                 '}';
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md
new file mode 100644
index 0000000..5ca1e4b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/README.md
@@ -0,0 +1,60 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Elasticsearch Client Service
+
+## Integration Tests
+
+The `nifi-elasticsearch-client-service` component build allows for optional Integration Tests to be executed to verify
+additional functionality.
+
+The Integration Tests create an in-memory instance of Elasticsearch, populate it with known data, perform operations
+upon the instance and verify the results.
+
+These can be activated by running the following build commands:
+
+### Elasticsearch 5
+
+Test integration with Elasticsearch 5.x:
+
+```bash
+mvn -P integration-tests,elasticsearch-oss clean verify
+```
+
+### Elasticsearch 6
+
+Test integration with Elasticsearch 6.x:
+
+```bash
+mvn -P integration-tests,elasticsearch-oss,elasticsearch-6 clean verify
+```
+
+### Elasticsearch 7
+
+Test integration with Elasticsearch 7.x:
+
+#### With X-Pack
+
+Allows for testing of some X-Pack only features such as "Point in Time" querying:
+
+```bash
+mvn -P integration-tests,elasticsearch-default,elasticsearch-7 clean verify
+```
+
+#### Without X-Pack
+
+```bash
+mvn -P integration-tests,elasticsearch-oss,elasticsearch-7 clean verify
+```
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 3f3ed11..d1a02b3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -26,9 +26,15 @@
     <packaging>jar</packaging>
 
     <properties>
+        <!-- use with integration-tests only -->
         <es.int.version>5.6.16</es.int.version>
-        <script.name>setup-5.script</script.name>
-        <type.name>faketype</type.name>
+        <es.int.script.name>setup-5.script</es.int.script.name>
+        <es.int.type.name>faketype</es.int.type.name>
+        <es.int.clusterName>testCluster</es.int.clusterName>
+        <es.int.transportPort>9500</es.int.transportPort>
+        <es.int.httpPort>9400</es.int.httpPort>
+        <es.int.timeout>90</es.int.timeout>
+        <es.int.logLevel>ERROR</es.int.logLevel>
     </properties>
 
     <dependencies>
@@ -105,6 +111,18 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-web-test-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-elasticsearch-client-service-api</artifactId>
             <version>1.15.0-SNAPSHOT</version>
             <scope>provided</scope>
@@ -160,61 +178,152 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-	</dependency>
-	<dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
-            <id>integration-6</id>
+            <!-- use with elasticsearch-oss only (-default can be used if x-pack-ml permissions fixed) -->
+            <id>elasticsearch-6</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
             <properties>
                 <es.int.version>6.8.13</es.int.version>
-                <type.name>_doc</type.name>
-                <script.name>setup-6.script</script.name>
+                <es.int.type.name>_doc</es.int.type.name>
+                <es.int.script.name>setup-6.script</es.int.script.name>
             </properties>
         </profile>
         <profile>
-            <id>integration-7</id>
+            <!-- use with elasticsearch-oss or elasticsearch-default -->
+            <id>elasticsearch-7</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
             <properties>
-                <es.int.version>7.10.0</es.int.version>
-                <script.name>setup-7.script</script.name>
-                <type.name>_doc</type.name>
+                <es.int.version>7.10.2</es.int.version>
+                <es.int.script.name>setup-7.script</es.int.script.name>
+                <es.int.type.name/>
             </properties>
         </profile>
+
+        <profile>
+            <!-- OSS Elasticsearch (no XPack features); required for ES 5.x or < 6.3-->
+            <id>elasticsearch-oss</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <configuration>
+                            <systemPropertyVariables>
+                                <type_name>${es.int.type.name}</type_name>
+                                <es_version>${es.int.version}</es_version>
+                                <es_flavour>oss</es_flavour>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>com.github.alexcojocaru</groupId>
+                        <artifactId>elasticsearch-maven-plugin</artifactId>
+                        <configuration>
+                            <clusterName>${es.int.clusterName}</clusterName>
+                            <transportPort>${es.int.transportPort}</transportPort>
+                            <httpPort>${es.int.httpPort}</httpPort>
+                            <version>${es.int.version}</version>
+                            <timeout>${es.int.timeout}</timeout>
+                            <logLevel>${es.int.logLevel}</logLevel>
+                            <pathInitScript>${project.basedir}/src/test/resources/${es.int.script.name}</pathInitScript>
+                            <keepExistingData>false</keepExistingData>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>start-elasticsearch</id>
+                                <phase>pre-integration-test</phase>
+                                <goals>
+                                    <goal>runforked</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>stop-elasticsearch</id>
+                                <phase>post-integration-test</phase>
+                                <goals>
+                                    <goal>stop</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
         <profile>
-            <id>integration-tests</id>
+            <!-- Elasticsearch (default) with XPack (only for ES 6.3+ although there are XPack permission problems in 6.x startup) -->
+            <id>elasticsearch-default</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
             <build>
                 <plugins>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-failsafe-plugin</artifactId>
-                        <version>3.0.0-M3</version>
                         <configuration>
                             <systemPropertyVariables>
-                                <type_name>${type.name}</type_name>
+                                <type_name>${es.int.type.name}</type_name>
+                                <es_version>${es.int.version}</es_version>
+                                <es_flavour>default</es_flavour>
                             </systemPropertyVariables>
                         </configuration>
                     </plugin>
                     <plugin>
                         <groupId>com.github.alexcojocaru</groupId>
                         <artifactId>elasticsearch-maven-plugin</artifactId>
-                        <version>6.19</version>
                         <configuration>
-                            <clusterName>testCluster</clusterName>
-                            <transportPort>9500</transportPort>
-                            <httpPort>9400</httpPort>
+                            <flavour>default</flavour>
+                            <clusterName>${es.int.clusterName}</clusterName>
+                            <transportPort>${es.int.transportPort}</transportPort>
+                            <httpPort>${es.int.httpPort}</httpPort>
                             <version>${es.int.version}</version>
-                            <timeout>90</timeout>
-                            <logLevel>ERROR</logLevel>
-                            <pathInitScript>${project.basedir}/src/test/resources/${script.name}</pathInitScript>
+                            <timeout>${es.int.timeout}</timeout>
+                            <logLevel>${es.int.logLevel}</logLevel>
+                            <pathInitScript>${project.basedir}/src/test/resources/${es.int.script.name}</pathInitScript>
+                            <keepExistingData>false</keepExistingData>
                         </configuration>
                         <executions>
                             <execution>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index cf2db2a..eb46102 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -20,21 +20,8 @@ package org.apache.nifi.elasticsearch;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLContext;
-
 import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -55,9 +42,26 @@ import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
     private ObjectMapper mapper;
 
@@ -112,7 +116,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         this.url = null;
     }
 
-    private void setupClient(ConfigurationContext context) throws MalformedURLException, InitializationException {
+    private void setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
         final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
         String[] hostsSplit = hosts.split(",[\\s]*");
         this.url = hostsSplit[0];
@@ -125,9 +129,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final Integer readTimeout    = context.getProperty(SOCKET_TIMEOUT).asInteger();
         final Integer retryTimeout   = context.getProperty(RETRY_TIMEOUT).asInteger();
 
-        HttpHost[] hh = new HttpHost[hostsSplit.length];
+        final HttpHost[] hh = new HttpHost[hostsSplit.length];
         for (int x = 0; x < hh.length; x++) {
-            URL u = new URL(hostsSplit[x]);
+            final URL u = new URL(hostsSplit[x]);
             hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
         }
 
@@ -140,7 +144,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
             throw new InitializationException(e);
         }
 
-        RestClientBuilder builder = RestClient.builder(hh)
+        final RestClientBuilder builder = RestClient.builder(hh)
                 .setHttpClientConfigCallback(httpClientBuilder -> {
                     if (sslContext != null) {
                         httpClientBuilder.setSSLContext(sslContext);
@@ -165,61 +169,71 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         this.client = builder.build();
     }
 
-    private Response runQuery(String endpoint, String query, String index, String type) {
-        StringBuilder sb = new StringBuilder()
-            .append("/").append(index);
-        if (type != null && !type.equals("")) {
+    private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map<String, String> requestParameters) {
+        final StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(index)) {
+            sb.append("/").append(index);
+        }
+        if (StringUtils.isNotBlank(type)) {
             sb.append("/").append(type);
         }
 
         sb.append(String.format("/%s", endpoint));
 
-        HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
-
+        final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
         try {
-            return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
-        } catch (Exception e) {
+            return client.performRequest("POST", sb.toString(), requestParameters != null ? requestParameters : Collections.emptyMap(), queryEntity);
+        } catch (final Exception e) {
             throw new ElasticsearchError(e);
         }
     }
 
-    private Map<String, Object> parseResponse(Response response) {
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> parseResponse(final Response response) {
         final int code = response.getStatusLine().getStatusCode();
 
         try {
             if (code >= 200 && code < 300) {
-                InputStream inputStream = response.getEntity().getContent();
-                byte[] result = IOUtils.toByteArray(inputStream);
+                final InputStream inputStream = response.getEntity().getContent();
+                final byte[] result = IOUtils.toByteArray(inputStream);
                 inputStream.close();
                 return (Map<String, Object>) mapper.readValue(new String(result, responseCharset), Map.class);
             } else {
-                String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
+                final String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
                         response.getStatusLine().getReasonPhrase());
                 throw new IOException(errorMessage);
             }
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new ElasticsearchError(ex);
         }
     }
 
+    private List<String> parseResponseWarningHeaders(final Response response) {
+        return Arrays.stream(response.getHeaders())
+                .filter(h -> "Warning".equalsIgnoreCase(h.getName()))
+                .map(Header::getValue)
+                .peek(h -> getLogger().warn("Elasticsearch Warning: {}", h))
+                .collect(Collectors.toList());
+    }
+
     @Override
-    public IndexOperationResponse add(IndexOperationRequest operation) {
+    public IndexOperationResponse add(final IndexOperationRequest operation) {
         return bulk(Collections.singletonList(operation));
     }
 
-    private String flatten(String str) {
+    private String flatten(final String str) {
         return str.replaceAll("[\\n\\r]", "\\\\n");
     }
 
-    private String buildBulkHeader(IndexOperationRequest request) throws JsonProcessingException {
-        String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
+    private String buildBulkHeader(final IndexOperationRequest request) throws JsonProcessingException {
+        final String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
                 ? "update"
                 : request.getOperation().getValue();
         return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
     }
 
-    private String buildBulkHeader(String operation, String index, String type, String id) throws JsonProcessingException {
-        Map<String, Object> header = new HashMap<String, Object>() {{
+    private String buildBulkHeader(final String operation, final String index, final String type, final String id) throws JsonProcessingException {
+        final Map<String, Object> header = new HashMap<String, Object>() {{
             put(operation, new HashMap<String, Object>() {{
                 put("_index", index);
                 if (StringUtils.isNotBlank(id)) {
@@ -234,24 +248,24 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         return flatten(mapper.writeValueAsString(header));
     }
 
-    protected void buildRequest(IndexOperationRequest request, StringBuilder builder) throws JsonProcessingException {
-        String header = buildBulkHeader(request);
+    protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException {
+        final String header = buildBulkHeader(request);
         builder.append(header).append("\n");
         switch (request.getOperation()) {
             case Index:
             case Create:
-                String indexDocument = mapper.writeValueAsString(request.getFields());
+                final String indexDocument = mapper.writeValueAsString(request.getFields());
                 builder.append(indexDocument).append("\n");
                 break;
             case Update:
             case Upsert:
-                Map<String, Object> doc = new HashMap<String, Object>() {{
+                final Map<String, Object> doc = new HashMap<String, Object>() {{
                     put("doc", request.getFields());
                     if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
                         put("doc_as_upsert", true);
                     }
                 }};
-                String update = flatten(mapper.writeValueAsString(doc)).trim();
+                final String update = flatten(mapper.writeValueAsString(doc)).trim();
                 builder.append(update).append("\n");
                 break;
             case Delete:
@@ -263,9 +277,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     }
 
     @Override
-    public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
+    public IndexOperationResponse bulk(final List<IndexOperationRequest> operations) {
         try {
-            StringBuilder payload = new StringBuilder();
+            final StringBuilder payload = new StringBuilder();
             for (final IndexOperationRequest or : operations) {
                 buildRequest(or, payload);
             }
@@ -273,49 +287,50 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
             if (getLogger().isDebugEnabled()) {
                 getLogger().debug(payload.toString());
             }
-            HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
-            StopWatch watch = new StopWatch();
+            final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
+            final StopWatch watch = new StopWatch();
             watch.start();
-            Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+            final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
             watch.stop();
 
-            String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            parseResponseWarningHeaders(response);
 
             if (getLogger().isDebugEnabled()) {
                 getLogger().debug(String.format("Response was: %s", rawResponse));
             }
 
             return IndexOperationResponse.fromJsonResponse(rawResponse);
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new ElasticsearchError(ex);
         }
     }
 
     @Override
-    public Long count(String query, String index, String type) {
-        Response response = runQuery("_count", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public Long count(final String query, final String index, final String type) {
+        final Response response = runQuery("_count", query, index, type, null);
+        final Map<String, Object> parsed = parseResponse(response);
 
         return ((Integer)parsed.get("count")).longValue();
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, String id) {
+    public DeleteOperationResponse deleteById(final String index, final String type, final String id) {
         return deleteById(index, type, Collections.singletonList(id));
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
+    public DeleteOperationResponse deleteById(final String index, final String type, final List<String> ids) {
         try {
-            StringBuilder sb = new StringBuilder();
+            final StringBuilder sb = new StringBuilder();
             for (final String id : ids) {
-                String header = buildBulkHeader("delete", index, type, id);
+                final String header = buildBulkHeader("delete", index, type, id);
                 sb.append(header).append("\n");
             }
-            HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
-            StopWatch watch = new StopWatch();
+            final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
+            final StopWatch watch = new StopWatch();
             watch.start();
-            Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+            final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
             watch.stop();
 
             if (getLogger().isDebugEnabled()) {
@@ -323,39 +338,46 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                         IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)));
             }
 
+            parseResponseWarningHeaders(response);
             return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new RuntimeException(ex);
         }
     }
 
     @Override
-    public DeleteOperationResponse deleteByQuery(String query, String index, String type) {
-        long start = System.currentTimeMillis();
-        Response response = runQuery("_delete_by_query", query, index, type);
-        long end   = System.currentTimeMillis();
+    public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type) {
+        final StopWatch watch = new StopWatch();
+        watch.start();
+        final Response response = runQuery("_delete_by_query", query, index, type, null);
+        watch.stop();
 
         // check for errors in response
         parseResponse(response);
+        parseResponseWarningHeaders(response);
 
-        return new DeleteOperationResponse(end - start);
+        return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public Map<String, Object> get(String index, String type, String id) {
+    public Map<String, Object> get(final String index, final String type, final String id) {
         try {
-            StringBuilder endpoint = new StringBuilder();
+            final StringBuilder endpoint = new StringBuilder();
             endpoint.append(index);
-            if (!StringUtils.isEmpty(type)) {
+            if (StringUtils.isNotBlank(type)) {
                 endpoint.append("/").append(type);
+            } else {
+                endpoint.append("/_doc");
             }
             endpoint.append("/").append(id);
-            Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
+            final Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
 
-            String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            parseResponseWarningHeaders(response);
 
             return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             getLogger().error("", ex);
             return null;
         }
@@ -364,7 +386,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     /*
      * In pre-7.X ElasticSearch, it should return just a number. 7.X and after they are returning a map.
      */
-    private int handleSearchCount(Object raw) {
+    @SuppressWarnings("unchecked")
+    private int handleSearchCount(final Object raw) {
         if (raw instanceof Number) {
             return Integer.parseInt(raw.toString());
         } else if (raw instanceof Map) {
@@ -375,44 +398,160 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(final String query, final String index, final String type, final Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(final String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
 
-        int took = (Integer)parsed.get("took");
-        boolean timedOut = (Boolean)parsed.get("timed_out");
-        Map<String, Object> aggregations = parsed.get("aggregations") != null
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(final String index, final String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + "/_pit", params);
+            final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            parseResponseWarningHeaders(response);
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Response for initialising Point in Time: %s", body));
+            }
+
+            return (String) mapper.readValue(body, Map.class).get("id");
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public DeleteOperationResponse deletePointInTime(final String pitId) {
+        try {
+            final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON);
+
+            final StopWatch watch = new StopWatch(true);
+            final Response response = client.performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity);
+            watch.stop();
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Response for deleting Point in Time: %s",
+                        IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8))
+                );
+            }
+
+            parseResponseWarningHeaders(response);
+            return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
+        } catch (final ResponseException re) {
+            if (404 == re.getResponse().getStatusLine().getStatusCode()) {
+                getLogger().debug("Point in Time {} not found in Elasticsearch for deletion, ignoring", pitId);
+                return new DeleteOperationResponse(0);
+            } else {
+                throw new RuntimeException(re);
+            }
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public DeleteOperationResponse deleteScroll(final String scrollId) {
+        try {
+            final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON);
+
+            final StopWatch watch = new StopWatch(true);
+            final Response response = client.performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody);
+            watch.stop();
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Response for deleting Scroll: %s",
+                        IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8))
+                );
+            }
+
+            parseResponseWarningHeaders(response);
+            return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
+        } catch (final ResponseException re) {
+            if (404 == re.getResponse().getStatusLine().getStatusCode()) {
+                getLogger().debug("Scroll Id {} not found in Elasticsearch for deletion, ignoring", scrollId);
+                return new DeleteOperationResponse(0);
+            } else {
+                throw new RuntimeException(re);
+            }
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private SearchResponse buildSearchResponse(final Response response) throws JsonProcessingException {
+        final Map<String, Object> parsed = parseResponse(response);
+        final List<String> warnings = parseResponseWarningHeaders(response);
+
+        final int took = (Integer)parsed.get("took");
+        final boolean timedOut = (Boolean)parsed.get("timed_out");
+        final String pitId = parsed.get("pit_id") != null ? (String)parsed.get("pit_id") : null;
+        final String scrollId = parsed.get("_scroll_id") != null ? (String)parsed.get("_scroll_id") : null;
+        final Map<String, Object> aggregations = parsed.get("aggregations") != null
                 ? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>();
-        Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits");
-        int count = handleSearchCount(hitsParent.get("total"));
-        List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits");
+        final Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits");
+        final int count = handleSearchCount(hitsParent.get("total"));
+        final List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits");
+        final String searchAfter = getSearchAfter(hits);
 
-        SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut);
+        final SearchResponse esr = new SearchResponse(hits, aggregations, pitId, scrollId, searchAfter, count, took, timedOut, warnings);
 
         if (getLogger().isDebugEnabled()) {
-            StringBuilder sb = new StringBuilder();
-            sb.append("******************");
-            sb.append(String.format("Took: %d", took));
-            sb.append(String.format("Timed out: %s", timedOut));
-            sb.append(String.format("Aggregation count: %d", aggregations.size()));
-            sb.append(String.format("Hit count: %d", hits.size()));
-            sb.append(String.format("Total found: %d", count));
-            sb.append("******************");
-
-            getLogger().debug(sb.toString());
+            final String searchSummary = "******************" +
+                    String.format("Took: %d", took) +
+                    String.format("Timed out: %s", timedOut) +
+                    String.format("Aggregation count: %d", aggregations.size()) +
+                    String.format("Hit count: %d", hits.size()) +
+                    String.format("PIT Id: %s", pitId) +
+                    String.format("Scroll Id: %s", scrollId) +
+                    String.format("Search After: %s", searchAfter) +
+                    String.format("Total found: %d", count) +
+                    String.format("Warnings: %s", warnings) +
+                    "******************";
+            getLogger().debug(searchSummary);
         }
 
         return esr;
     }
 
+    private String getSearchAfter(final List<Map<String, Object>> hits) throws JsonProcessingException {
+        String searchAfter = null;
+        if (!hits.isEmpty()) {
+            final Object lastHitSort = hits.get(hits.size() - 1).get("sort");
+            if (lastHitSort != null && !"null".equalsIgnoreCase(lastHitSort.toString())) {
+                searchAfter = mapper.writeValueAsString(lastHitSort);
+            }
+        }
+        return searchAfter;
+    }
+
     @Override
-    public String getTransitUrl(String index, String type) {
-        return new StringBuilder()
-                .append(this.url)
-                .append(StringUtils.isNotBlank(index) ? "/" : "")
-                .append(StringUtils.isNotBlank(index) ? index : "")
-                .append(StringUtils.isNotBlank(type) ? "/" : "")
-                .append(StringUtils.isNotBlank(type) ? type : "")
-                .toString();
+    public String getTransitUrl(final String index, final String type) {
+        return this.url +
+                (StringUtils.isNotBlank(index) ? "/" : "") +
+                (StringUtils.isNotBlank(index) ? index : "") +
+                (StringUtils.isNotBlank(type) ? "/" : "") +
+                (StringUtils.isNotBlank(type) ? type : "");
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index fd5a3e3..5b476de 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -204,7 +204,7 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
 
         String json = mapper.writeValueAsString(query);
 
-        SearchResponse response = clientService.search(json, index, type);
+        SearchResponse response = clientService.search(json, index, type, null);
 
         if (response.getNumberOfHits() > 1) {
             throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
@@ -268,7 +268,7 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
         try {
             final String json = mapper.writeValueAsString(buildQuery(query));
 
-            SearchResponse response = clientService.search(json, index, type);
+            SearchResponse response = clientService.search(json, index, type, null);
 
             if (response.getNumberOfHits() == 0) {
                 return null;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
index 78e641d..2fe9a6b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
@@ -25,18 +25,31 @@ class SearchResponseTest {
     void test() {
         def results = []
         def aggs    = [:]
+        def pitId = "pitId"
+        def scrollId = "scrollId"
+        def searchAfter = "searchAfter"
         def num     = 10
         def took    = 100
         def timeout = false
-        def response = new SearchResponse(results, aggs, num, took, timeout)
+        def warnings = ["auth"]
+        def response = new SearchResponse(results, aggs as Map<String, Object>, pitId, scrollId, searchAfter, num, took, timeout, warnings)
         def str = response.toString()
         Assert.assertEquals(results, response.hits)
         Assert.assertEquals(aggs, response.aggregations)
+        Assert.assertEquals(pitId, response.pitId)
+        Assert.assertEquals(scrollId, response.scrollId)
         Assert.assertEquals(num, response.numberOfHits)
         Assert.assertEquals(took, response.took)
         Assert.assertEquals(timeout, response.timedOut)
+        Assert.assertEquals(warnings, response.warnings)
         Assert.assertTrue(str.contains("aggregations"))
         Assert.assertTrue(str.contains("hits"))
+        Assert.assertTrue(str.contains("pitId"))
+        Assert.assertTrue(str.contains("scrollId"))
+        Assert.assertTrue(str.contains("searchAfter"))
         Assert.assertTrue(str.contains("numberOfHits"))
+        Assert.assertTrue(str.contains("took"))
+        Assert.assertTrue(str.contains("timedOut"))
+        Assert.assertTrue(str.contains("warnings"))
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
new file mode 100644
index 0000000..0ac8cd9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import groovy.json.JsonSlurper
+import org.apache.maven.artifact.versioning.ComparableVersion
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
+import org.apache.nifi.security.util.StandardTlsConfiguration
+import org.apache.nifi.security.util.TemporaryKeyStoreBuilder
+import org.apache.nifi.security.util.TlsConfiguration
+import org.apache.nifi.ssl.SSLContextService
+import org.apache.nifi.util.StringUtils
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.apache.nifi.web.util.ssl.SslContextUtils
+import org.junit.After
+import org.junit.Assert
+import org.junit.Assume
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import javax.net.ssl.SSLContext
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.is
+import static org.mockito.Mockito.mock
+import static org.mockito.Mockito.when
+
+class ElasticSearchClientService_IT {
+    private TestRunner runner
+    private ElasticSearchClientServiceImpl service
+
+    static final String INDEX = "messages"
+    static final String TYPE  = StringUtils.isBlank(System.getProperty("type_name")) ? null : System.getProperty("type_name")
+
+    static final ComparableVersion VERSION = new ComparableVersion(System.getProperty("es_version", "0.0.0"))
+    static final ComparableVersion ES_7_10 = new ComparableVersion("7.10")
+
+    static final String FLAVOUR = System.getProperty("es_flavour")
+    static final String DEFAULT = "default"
+
+    private static TlsConfiguration generatedTlsConfiguration
+    private static TlsConfiguration truststoreTlsConfiguration
+
+    static boolean isElasticsearchSetup() {
+        boolean setup = true
+        if (StringUtils.isBlank(System.getProperty("es_version"))) {
+            System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch version (5, 6, 7) not specified")
+            setup = false
+        }
+
+        if (StringUtils.isBlank(System.getProperty("es_flavour"))) {
+            System.err.println("Cannot run Elasticsearch integration-tests: Elasticsearch flavour (oss, default) not specified")
+            setup = false
+        }
+
+        return setup
+    }
+
+    @BeforeClass
+    static void beforeAll() throws Exception {
+        Assume.assumeTrue("Elasticsearch integration-tests not setup", isElasticsearchSetup())
+
+        System.out.println(
+                String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
+                        TYPE, VERSION, FLAVOUR)
+        )
+
+        generatedTlsConfiguration = new TemporaryKeyStoreBuilder().build()
+        truststoreTlsConfiguration = new StandardTlsConfiguration(
+                null,
+                null,
+                null,
+                generatedTlsConfiguration.getTruststorePath(),
+                generatedTlsConfiguration.getTruststorePassword(),
+                generatedTlsConfiguration.getTruststoreType()
+        )
+    }
+
+    @Before
+    void before() throws Exception {
+        runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
+        service = new ElasticSearchClientServiceImpl()
+        runner.addControllerService("Client Service", service)
+        runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400")
+        runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
+        runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
+        runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
+        runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue())
+        try {
+            runner.enableControllerService(service)
+        } catch (Exception ex) {
+            ex.printStackTrace()
+            throw ex
+        }
+    }
+
+    @After
+    void after() throws Exception {
+        service.onDisabled()
+    }
+
+    @Test
+    void testBasicSearch() throws Exception {
+        String query = prettyPrint(toJson([
+            size: 10,
+            query: [
+                match_all: [:]
+            ],
+            aggs: [
+                term_counts: [
+                    terms: [
+                        field: "msg",
+                        size: 5
+                    ]
+                ]
+            ]
+        ]))
+        
+        
+        SearchResponse response = service.search(query, INDEX, TYPE, null)
+        Assert.assertNotNull("Response was null", response)
+
+        Assert.assertEquals("Wrong count", 15, response.numberOfHits)
+        Assert.assertFalse("Timed out", response.isTimedOut())
+        Assert.assertNotNull("Hits was null", response.getHits())
+        Assert.assertEquals("Wrong number of hits", 10, response.hits.size())
+        Assert.assertNotNull("Aggregations are missing", response.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
+        Assert.assertNull("Unexpected ScrollId", response.scrollId)
+        Assert.assertNull("Unexpected Search_After", response.searchAfter)
+        Assert.assertNull("Unexpected pitId", response.pitId)
+
+        Map termCounts = response.aggregations.get("term_counts") as Map
+        Assert.assertNotNull("Term counts was missing", termCounts)
+        def buckets = termCounts.get("buckets")
+        Assert.assertNotNull("Buckets branch was empty", buckets)
+        def expected = [
+            "one": 1,
+            "two": 2,
+            "three": 3,
+            "four": 4,
+            "five": 5
+        ]
+
+        buckets.each { aggRes ->
+            def key = aggRes["key"]
+            def docCount = aggRes["doc_count"]
+            Assert.assertEquals("${key} did not match.", expected[key as String], docCount)
+        }
+    }
+
+    @Test
+    void testSearchWarnings() {
+        String query
+        String type = TYPE
+        if (VERSION.toString().startsWith("7.")) {
+            // querying with _type in ES 7.x is deprecated
+            query = prettyPrint(toJson([size: 1, query: [match_all: [:]]]))
+            type = "a-type"
+        } else if (VERSION.toString().startsWith("6.")) {
+            // "query_string" query option "all_fields" in ES 6.x is deprecated
+            query = prettyPrint(toJson([size: 1, query: [query_string: [query: 1, all_fields: true]]]))
+        } else {
+            // "mlt" query in ES 5.x is deprecated
+            query = prettyPrint(toJson([size: 1, query: [mlt: [fields: ["msg"], like: 1]]]))
+        }
+        final SearchResponse response = service.search(query, INDEX, type, null)
+        Assert.assertTrue("Missing warnings", !response.warnings.isEmpty())
+    }
+
+    @Test
+    void testScroll() {
+        final String query = prettyPrint(toJson([
+                size: 2,
+                query: [ match_all: [:] ],
+                aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ]
+        ]))
+
+        // initiate the scroll
+        final SearchResponse response = service.search(query, INDEX, TYPE, Collections.singletonMap("scroll", "10s"))
+        Assert.assertNotNull("Response was null", response)
+
+        Assert.assertEquals("Wrong count", 15, response.numberOfHits)
+        Assert.assertFalse("Timed out", response.isTimedOut())
+        Assert.assertNotNull("Hits was null", response.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, response.hits.size())
+        Assert.assertNotNull("Aggregations are missing", response.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
+        Assert.assertNotNull("ScrollId missing", response.scrollId)
+        Assert.assertNull("Unexpected Search_After", response.searchAfter)
+        Assert.assertNull("Unexpected pitId", response.pitId)
+
+        final Map termCounts = response.aggregations.get("term_counts") as Map
+        Assert.assertNotNull("Term counts was missing", termCounts)
+        Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size())
+
+        // scroll the next page
+        final SearchResponse scrollResponse = service.scroll(prettyPrint((toJson([scroll_id: response.scrollId, scroll: "10s"]))))
+        Assert.assertNotNull("Scroll Response was null", scrollResponse)
+
+        Assert.assertEquals("Wrong count", 15, scrollResponse.numberOfHits)
+        Assert.assertFalse("Timed out", scrollResponse.isTimedOut())
+        Assert.assertNotNull("Hits was null", scrollResponse.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, scrollResponse.hits.size())
+        Assert.assertNotNull("Aggregations missing", scrollResponse.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 0, scrollResponse.aggregations.size())
+        Assert.assertNotNull("ScrollId missing", scrollResponse.scrollId)
+        Assert.assertNull("Unexpected Search_After", scrollResponse.searchAfter)
+        Assert.assertNull("Unexpected pitId", scrollResponse.pitId)
+
+        Assert.assertNotEquals("Same results", scrollResponse.hits, response.hits)
+
+        // delete the scroll
+        DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.scrollId)
+        Assert.assertNotNull("Delete Response was null", deleteResponse)
+        Assert.assertTrue(deleteResponse.took > 0)
+
+        // delete scroll again (should now be unknown but the 404 caught and ignored)
+        deleteResponse = service.deleteScroll(scrollResponse.scrollId)
+        Assert.assertNotNull("Delete Response was null", deleteResponse)
+        Assert.assertEquals(0L, deleteResponse.took)
+    }
+
+    @Test
+    void testSearchAfter() {
+        final Map<String, Object> queryMap = [
+                size: 2,
+                query: [ match_all: [:] ],
+                aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ],
+                sort: [[ msg: "desc" ]]
+        ]
+        final String query = prettyPrint(toJson(queryMap))
+
+        // search first page
+        final SearchResponse response = service.search(query, INDEX, TYPE, null)
+        Assert.assertNotNull("Response was null", response)
+
+        Assert.assertEquals("Wrong count", 15, response.numberOfHits)
+        Assert.assertFalse("Timed out", response.isTimedOut())
+        Assert.assertNotNull("Hits was null", response.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, response.hits.size())
+        Assert.assertNotNull("Aggregations missing", response.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
+        Assert.assertNull("Unexpected ScrollId", response.scrollId)
+        Assert.assertNotNull("Search_After missing", response.searchAfter)
+        Assert.assertNull("Unexpected pitId", response.pitId)
+
+        final Map termCounts = response.aggregations.get("term_counts") as Map
+        Assert.assertNotNull("Term counts was missing", termCounts)
+        Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size())
+
+        // search the next page
+        queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable
+        queryMap.remove("aggs")
+        final String secondPage = prettyPrint(toJson(queryMap))
+        final SearchResponse secondResponse = service.search(secondPage, INDEX, TYPE, null)
+        Assert.assertNotNull("Second Response was null", secondResponse)
+
+        Assert.assertEquals("Wrong count", 15, secondResponse.numberOfHits)
+        Assert.assertFalse("Timed out", secondResponse.isTimedOut())
+        Assert.assertNotNull("Hits was null", secondResponse.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, secondResponse.hits.size())
+        Assert.assertNotNull("Aggregations missing", secondResponse.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 0, secondResponse.aggregations.size())
+        Assert.assertNull("Unexpected ScrollId", secondResponse.scrollId)
+        Assert.assertNotNull("Unexpected Search_After", secondResponse.searchAfter)
+        Assert.assertNull("Unexpected pitId", secondResponse.pitId)
+
+        Assert.assertNotEquals("Same results", secondResponse.hits, response.hits)
+    }
+
+    @Test
+    void testPointInTime() {
+        // Point in Time only available in 7.10+ with XPack enabled
+        Assume.assumeTrue("Requires version 7.10+", VERSION >= ES_7_10)
+        Assume.assumeThat("Requires XPack features", FLAVOUR, is(DEFAULT))
+
+        // initialise
+        final String pitId = service.initialisePointInTime(INDEX, "10s")
+
+        final Map<String, Object> queryMap = [
+                size: 2,
+                query: [ match_all: [:] ],
+                aggs: [ term_counts: [ terms: [ field: "msg", size: 5 ] ] ],
+                sort: [[ msg: "desc" ]],
+                pit: [ id: pitId, keep_alive: "10s" ]
+        ]
+        final String query = prettyPrint(toJson(queryMap))
+
+        // search first page
+        final SearchResponse response = service.search(query, null, TYPE, null)
+        Assert.assertNotNull("Response was null", response)
+
+        Assert.assertEquals("Wrong count", 15, response.numberOfHits)
+        Assert.assertFalse("Timed out", response.isTimedOut())
+        Assert.assertNotNull("Hits was null", response.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, response.hits.size())
+        Assert.assertNotNull("Aggregations missing", response.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
+        Assert.assertNull("Unexpected ScrollId", response.scrollId)
+        Assert.assertNotNull("Unexpected Search_After", response.searchAfter)
+        Assert.assertNotNull("pitId missing", response.pitId)
+
+        final Map termCounts = response.aggregations.get("term_counts") as Map
+        Assert.assertNotNull("Term counts was missing", termCounts)
+        Assert.assertEquals("Buckets count is wrong", 5, (termCounts.get("buckets") as List).size())
+
+        // search the next page
+        queryMap.search_after = new JsonSlurper().parseText(response.searchAfter) as Serializable
+        queryMap.remove("aggs")
+        final String secondPage = prettyPrint(toJson(queryMap))
+        final SearchResponse secondResponse = service.search(secondPage, null, TYPE, null)
+        Assert.assertNotNull("Second Response was null", secondResponse)
+
+        Assert.assertEquals("Wrong count", 15, secondResponse.numberOfHits)
+        Assert.assertFalse("Timed out", secondResponse.isTimedOut())
+        Assert.assertNotNull("Hits was null", secondResponse.getHits())
+        Assert.assertEquals("Wrong number of hits", 2, secondResponse.hits.size())
+        Assert.assertNotNull("Aggregations missing", secondResponse.aggregations)
+        Assert.assertEquals("Aggregation count is wrong", 0, secondResponse.aggregations.size())
+        Assert.assertNull("Unexpected ScrollId", secondResponse.scrollId)
+        Assert.assertNotNull("Unexpected Search_After", secondResponse.searchAfter)
+        Assert.assertNotNull("pitId missing", secondResponse.pitId)
+
+        Assert.assertNotEquals("Same results", secondResponse.hits, response.hits)
+
+        // delete pitId
+        DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId)
+        Assert.assertNotNull("Delete Response was null", deleteResponse)
+        Assert.assertTrue(deleteResponse.took > 0)
+
+        // delete pitId again (should now be unknown but the 404 caught and ignored)
+        deleteResponse = service.deletePointInTime(pitId)
+        Assert.assertNotNull("Delete Response was null", deleteResponse)
+        Assert.assertEquals(0L, deleteResponse.took)
+    }
+
+    @Test
+    void testDeleteByQuery() throws Exception {
+        String query = prettyPrint(toJson([
+            query: [
+                match: [
+                    msg: "five"
+                ]
+            ]
+        ]))
+        DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE)
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+    }
+
+    @Test
+    void testDeleteById() throws Exception {
+        final String ID = "1"
+        final def originalDoc = service.get(INDEX, TYPE, ID)
+        DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID)
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+        def doc = service.get(INDEX, TYPE, ID)
+        Assert.assertNull(doc)
+        doc = service.get(INDEX, TYPE, "2")
+        Assert.assertNotNull(doc)
+
+        // replace the deleted doc
+        service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index))
+        waitForIndexRefresh() // (affects later tests using _search or _bulk)
+    }
+
+    @Test
+    void testGet() throws IOException {
+        Map old
+        1.upto(15) { index ->
+            String id = String.valueOf(index)
+            def doc = service.get(INDEX, TYPE, id)
+            Assert.assertNotNull("Doc was null", doc)
+            Assert.assertNotNull("${doc.toString()}\t${doc.keySet().toString()}", doc.get("msg"))
+            old = doc
+        }
+    }
+
+    @Test
+    void testSSL() {
+        final String serviceIdentifier = SSLContextService.class.getName()
+        final SSLContextService sslContext = mock(SSLContextService.class)
+        when(sslContext.getIdentifier()).thenReturn(serviceIdentifier)
+
+        final SSLContext clientSslContext = SslContextUtils.createSslContext(truststoreTlsConfiguration)
+        when(sslContext.createContext()).thenReturn(clientSslContext)
+        when(sslContext.createTlsConfiguration()).thenReturn(truststoreTlsConfiguration)
+
+        runner.addControllerService(serviceIdentifier, sslContext)
+        runner.enableControllerService(sslContext)
+
+        runner.disableControllerService(service)
+        runner.setProperty(service, ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier)
+        runner.enableControllerService(service)
+
+        runner.assertValid(service)
+    }
+
+    @Test
+    void testNullSuppression() {
+        Map<String, Object> doc = new HashMap<String, Object>(){{
+            put("msg", "test")
+            put("is_null", null)
+            put("is_empty", "")
+            put("is_blank", " ")
+            put("empty_nested", Collections.emptyMap())
+            put("empty_array", Collections.emptyList())
+        }}
+
+        // index with nulls
+        suppressNulls(false)
+        IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)])
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+        waitForIndexRefresh()
+
+        Map<String, Object> result = service.get("nulls", TYPE, "1")
+        Assert.assertEquals(doc, result)
+
+        // suppress nulls
+        suppressNulls(true)
+        response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)])
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+        waitForIndexRefresh()
+
+        result = service.get("nulls", TYPE, "2")
+        Assert.assertTrue("Non-nulls (present): " + result.toString(), result.keySet().containsAll(["msg", "is_blank"]))
+        Assert.assertFalse("is_null (should be omitted): " + result.toString(), result.keySet().contains("is_null"))
+        Assert.assertFalse("is_empty (should be omitted): " + result.toString(), result.keySet().contains("is_empty"))
+        Assert.assertFalse("empty_nested (should be omitted): " + result.toString(), result.keySet().contains("empty_nested"))
+        Assert.assertFalse("empty_array (should be omitted): " + result.toString(), result.keySet().contains("empty_array"))
+    }
+
+    private void suppressNulls(final boolean suppressNulls) {
+        runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service")
+        runner.disableControllerService(service)
+        runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue() : ElasticSearchClientService.NEVER_SUPPRESS.getValue())
+        runner.enableControllerService(service)
+        runner.assertValid()
+    }
+
+    @Test
+    void testBulkAddTwoIndexes() throws Exception {
+        List<IndexOperationRequest> payload = new ArrayList<>()
+        for (int x = 0; x < 20; x++) {
+            String index = x % 2 == 0 ? "bulk_a": "bulk_b"
+            payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+                put("msg", "test")
+            }}, IndexOperationRequest.Operation.Index))
+        }
+        for (int x = 0; x < 5; x++) {
+            payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+                put("msg", "test")
+            }}, IndexOperationRequest.Operation.Index))
+        }
+        IndexOperationResponse response = service.bulk(payload)
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+        waitForIndexRefresh()
+
+        /*
+         * Now, check to ensure that both indexes got populated appropriately.
+         */
+        String query = "{ \"query\": { \"match_all\": {}}}"
+        Long indexA = service.count(query, "bulk_a", TYPE)
+        Long indexB = service.count(query, "bulk_b", TYPE)
+        Long indexC = service.count(query, "bulk_c", TYPE)
+
+        Assert.assertNotNull(indexA)
+        Assert.assertNotNull(indexB)
+        Assert.assertNotNull(indexC)
+        Assert.assertEquals(indexA, indexB)
+        Assert.assertEquals(10, indexA.intValue())
+        Assert.assertEquals(10, indexB.intValue())
+        Assert.assertEquals(5, indexC.intValue())
+
+        Long total = service.count(query, "bulk_*", TYPE)
+        Assert.assertNotNull(total)
+        Assert.assertEquals(25, total.intValue())
+    }
+
+    @Test
+    void testUpdateAndUpsert() {
+        final String TEST_ID = "update-test"
+        Map<String, Object> doc = new HashMap<>()
+        doc.put("msg", "Buongiorno, mondo")
+        service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index))
+        Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID)
+        Assert.assertEquals("Not the same", doc, result)
+
+        Map<String, Object> updates = new HashMap<>()
+        updates.put("from", "john.smith")
+        Map<String, Object> merged = new HashMap<>()
+        merged.putAll(updates)
+        merged.putAll(doc)
+        IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update)
+        service.add(request)
+        result = service.get(INDEX, TYPE, TEST_ID)
+        Assert.assertTrue(result.containsKey("from"))
+        Assert.assertTrue(result.containsKey("msg"))
+        Assert.assertEquals("Not the same after update.", merged, result)
+
+        final String UPSERTED_ID = "upsert-ftw"
+        Map<String, Object> upsertItems = new HashMap<>()
+        upsertItems.put("upsert_1", "hello")
+        upsertItems.put("upsert_2", 1)
+        upsertItems.put("upsert_3", true)
+        request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert)
+        service.add(request)
+        result = service.get(INDEX, TYPE, UPSERTED_ID)
+        Assert.assertEquals(upsertItems, result)
+
+        List<IndexOperationRequest> deletes = new ArrayList<>()
+        deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete))
+        deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
+        Assert.assertFalse(service.bulk(deletes).hasErrors())
+        waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk)
+        Assert.assertNull(service.get(INDEX, TYPE, TEST_ID))
+        Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID))
+    }
+
+    @Test
+    void testGetBulkResponsesWithErrors() {
+        def ops = [
+                new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: 1], IndexOperationRequest.Operation.Index), // OK
+                new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists
+                new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field
+        ]
+        def response = service.bulk(ops)
+        assert response.hasErrors()
+        assert response.items.findAll {
+            def key = it.keySet().stream().findFirst().get()
+            it[key].containsKey("error")
+        }.size() == 2
+    }
+
+    private static void waitForIndexRefresh() {
+        Thread.sleep(1000)
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
index f8d4ef5..2b6c060 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
@@ -31,7 +31,9 @@ import org.apache.nifi.serialization.record.type.RecordDataType
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
 import org.junit.Assert
+import org.junit.Assume
 import org.junit.Before
+import org.junit.BeforeClass
 import org.junit.Test
 
 class ElasticSearchLookupService_IT {
@@ -39,7 +41,15 @@ class ElasticSearchLookupService_IT {
     private ElasticSearchClientService service
     private ElasticSearchLookupService lookupService
 
-    static String TYPE  = System.getProperty("type_name")
+    @BeforeClass
+    static void beforeAll() throws Exception {
+        Assume.assumeTrue("Elasticsearch integration-tests not setup", ElasticSearchClientService_IT.isElasticsearchSetup())
+
+        System.out.println(
+                String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
+                        ElasticSearchClientService_IT.TYPE, ElasticSearchClientService_IT.VERSION, ElasticSearchClientService_IT.FLAVOUR)
+        )
+    }
 
     @Before
     void before() throws Exception {
@@ -56,7 +66,7 @@ class ElasticSearchLookupService_IT {
         runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
         runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
+        setTypeOnLookupService()
 
         try {
             runner.enableControllerService(service)
@@ -67,6 +77,14 @@ class ElasticSearchLookupService_IT {
         }
     }
 
+    void setTypeOnLookupService() {
+        if (ElasticSearchClientService_IT.TYPE != null) {
+            runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, ElasticSearchClientService_IT.TYPE)
+        } else {
+            runner.removeProperty(lookupService, ElasticSearchLookupService.TYPE)
+        }
+    }
+
     @Test
     void testValidity() throws Exception {
         setDefaultSchema()
@@ -107,7 +125,7 @@ class ElasticSearchLookupService_IT {
         ]
 
         coordinates.each { coordinate ->
-            def exception
+            def exception = null
 
             try {
                 lookupService.lookup(coordinate)
@@ -143,7 +161,7 @@ class ElasticSearchLookupService_IT {
 
         runner.disableControllerService(lookupService)
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
+        setTypeOnLookupService()
         runner.enableControllerService(lookupService)
 
         Optional<Record> response = lookupService.lookup(coordinates)
@@ -164,7 +182,7 @@ class ElasticSearchLookupService_IT {
     void testDetectedSchema() throws LookupFailureException {
         runner.disableControllerService(lookupService)
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
+        setTypeOnLookupService()
         runner.enableControllerService(lookupService)
         def coordinates = ["_id": "1" ]
 
@@ -186,7 +204,7 @@ class ElasticSearchLookupService_IT {
         Assert.assertEquals("2018-04-10T12:18:05Z", subRec.getValue("dateField"))
     }
 
-    Record getSubRecord(Record rec, String fieldName) {
+    static Record getSubRecord(Record rec, String fieldName) {
         RecordSchema schema = rec.schema
         RecordSchema subSchema = ((RecordDataType)schema.getField(fieldName).get().dataType).childSchema
         rec.getAsRecord(fieldName, subSchema)
@@ -198,7 +216,7 @@ class ElasticSearchLookupService_IT {
         runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
         runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
+        setTypeOnLookupService()
         runner.enableControllerService(lookupService)
 
         def coordinates = ["msg": "Hello, world"]
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
index 6dfc5e4..81d36ab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
@@ -68,11 +68,31 @@ class TestElasticSearchClientService extends AbstractControllerService implement
     }
 
     @Override
-    SearchResponse search(String query, String index, String type) {
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
         List hits = [[
             "_source": data
         ]]
-        return new SearchResponse(hits, null, 1, 100, false)
+        return new SearchResponse(hits, null, null, null, null, 1, 100, false, null)
+    }
+
+    @Override
+    SearchResponse scroll(String scroll) {
+        return search(null, null, null, null)
+    }
+
+    @Override
+    String initialisePointInTime(String index, String keepAlive) {
+        return null
+    }
+
+    @Override
+    DeleteOperationResponse deletePointInTime(String pitId) {
+        return null
+    }
+
+    @Override
+    DeleteOperationResponse deleteScroll(String scrollId) {
+        return null
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index 83e0aef..fd0df9d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -139,6 +139,12 @@ language governing permissions and limitations under the License. -->
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-elasticsearch-client-service</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -154,4 +160,4 @@ language governing permissions and limitations under the License. -->
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
new file mode 100644
index 0000000..705fcd8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
+            "splitUp-yes",
+            "Per Hit",
+            "Flowfile per hit."
+    );
+    public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
+            "splitUp-no",
+            "Per Response",
+            "Flowfile per response."
+    );
+
+    public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Search Results Split")
+            .description("Output a flowfile containing all hits or one flowfile for each individual hit.")
+            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
+            .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor AGGREGATION_RESULTS_SPLIT = new PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Aggregation Results Split")
+            .description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
+            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
+            .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SEARCH_RESULTS_SPLIT);
+        descriptors.add(AGGREGATION_RESULTS_SPLIT);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
+        splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, context, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, context, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session,
+                                    final ProcessContext context, final FlowFile input, final StopWatch stopWatch) throws IOException;
+
+    abstract void finishQuery(final FlowFile input, final Q queryParameters, final ProcessSession session, final ProcessContext context,
+                              final SearchResponse response) throws IOException;
+
+    FlowFile createChildFlowFile(final ProcessSession session, final FlowFile parent) {
+        return parent != null ? session.create(parent) : session.create();
+    }
+
+    private FlowFile writeAggregationFlowFileContents(final String name, final Integer number, final String json,
+                                                      final ProcessSession session, final FlowFile aggFlowFile,
+                                                      final Map<String, String> attributes) {
+        FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
+        ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+            if (name != null) {
+                put("aggregation.name", name);
+            }
+            if (number != null) {
+                put("aggregation.number", number.toString());
+            }
+        }});
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void handleAggregations(final Map<String, Object> aggregations, final ProcessSession session,
+                                    final FlowFile parent, final Map<String, String> attributes,
+                                    final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (aggregations != null && !aggregations.isEmpty()) {
+            final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+            if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
+                int aggCount = 0;
+                for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+                    final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                    final String aggJson = mapper.writeValueAsString(agg.getValue());
+                    aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
+                }
+            } else {
+                final FlowFile aggFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(aggregations);
+                aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
+            }
+
+            if (!aggsFlowFiles.isEmpty()) {
+                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+                aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            }
+        }
+    }
+
+    private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
+                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
+        final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
+        attributes.put("hit.count", Integer.toString(count));
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    /*
+     * The List<FlowFile> hitsFlowFiles parameter and return value are used in order to allow pagination of query results
+     * in AbstractPaginatedJsonQueryElasticsearch. The List is created in onTrigger and passed to doQuery => handleResponse => handleHits,
+     * for non-paginated queries the return value will always be an empty List as the FlowFiles will have been transferred;
+     * for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
+     * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
+     */
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
+                              final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
+                              final String transitUri, final StopWatch stopWatch) throws IOException {
+        if (hits != null && !hits.isEmpty()) {
+            if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
+                for (final Map<String, Object> hit : hits) {
+                    final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                    final String json = mapper.writeValueAsString(hit);
+                    hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
+                }
+            } else {
+                final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+                final String json = mapper.writeValueAsString(hits);
+                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+            }
+        }
+
+        transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
+
+        return hitsFlowFiles;
+    }
+
+    private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
+                                         final StopWatch stopWatch) {
+        // output any results
+        if (!hitsFlowFiles.isEmpty()) {
+            session.transfer(hitsFlowFiles, REL_HITS);
+            hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            hitsFlowFiles.clear();
+        }
+    }
+
+    List<FlowFile> handleResponse(final SearchResponse response, final boolean newQuery, final Q queryJsonParameters,
+                                  final List<FlowFile> hitsFlowFiles, final ProcessSession session, final FlowFile input,
+                                  final StopWatch stopWatch) throws IOException {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+        if (StringUtils.isNotBlank(queryJsonParameters.getQueryAttr())) {
+            attributes.put(queryJsonParameters.getQueryAttr(), queryJsonParameters.getQuery());
+        }
+
+        final String transitUri = clientService.get().getTransitUrl(queryJsonParameters.getIndex(), queryJsonParameters.getType());
+        if (newQuery) {
+            // only output aggregations from initial query
+            // (omitted from subsequent pages as aggs are calculated across the entire result set, not per page)
+            handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
+        }
+
+        final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input,
+                attributes, hitsFlowFiles, transitUri, stopWatch);
+        queryJsonParameters.addHitCount(response.getHits().size());
+
+        return resultFlowFiles;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
new file mode 100644
index 0000000..b955877
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue FLOWFILE_PER_QUERY = new AllowableValue(
+            "splitUp-query",
+            "Per Query",
+            "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
+                    "Note that aggregations cannot be paged, they are generated across the entire result set and " +
+                    "returned as part of the first page. Results are output with one JSON object per line " +
+                    "(allowing hits to be combined from multiple pages without loading all results into memory)."
+    );
+
+    public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
+            .description("Output a flowfile containing all hits or one flowfile for each individual hit " +
+                    "or one flowfile containing all hits from all paged responses.")
+            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT, FLOWFILE_PER_QUERY)
+            .build();
+
+    public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
+            "pagination-search_after",
+            "Search After",
+            "Use Elasticsearch \"search_after\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
+            "pagination-pit",
+            "Point in Time",
+            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+            "pagination-scroll",
+            "Scroll",
+            "Use Elasticsearch \"scroll\" to page results."
+    );
+
+    public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-type")
+            .displayName("Pagination Type")
+            .description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
+                    "check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
+            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
+            .defaultValue(PAGINATION_SCROLL.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new PropertyDescriptor.Builder()
+            .name("el-rest-pagination-keep-alive")
+            .displayName("Pagination Keep Alive")
+            .description("Pagination \"keep_alive\" period. Period Elasticsearch will keep the scroll/pit cursor alive " +
+                    "in between requests (this is not the time expected for all pages to be returned, but the maximum " +
+                    "allowed time for requests between page retrievals).")
+            .required(true)
+            .defaultValue("10 mins")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .build();
+
+    static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SEARCH_RESULTS_SPLIT);
+        descriptors.add(AGGREGATION_RESULTS_SPLIT);
+        descriptors.add(PAGINATION_TYPE);
+        descriptors.add(PAGINATION_KEEP_ALIVE);
+
+        paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    // output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
+    private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
+
+    String paginationType;
+
+    @Override
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        paginationType = context.getProperty(PAGINATION_TYPE).getValue();
+    }
+
+    @Override
+    SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, List<FlowFile> hitsFlowFiles,
+                           final ProcessSession session, final ProcessContext context, final FlowFile input,
+                           final StopWatch stopWatch) throws IOException {
+        SearchResponse response = null;
+        do {
+            // check any previously started query hasn't expired
+            final boolean expiredQuery = isExpired(paginatedJsonQueryParameters, context, response);
+            final boolean newQuery = StringUtils.isBlank(paginatedJsonQueryParameters.getPageExpirationTimestamp()) || expiredQuery;
+
+            // execute query/scroll
+            final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters);
+            if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) {
+                response = clientService.get().scroll(queryJson);
+            } else {
+                response = clientService.get().search(
+                        queryJson,
+                        // Point in Time uses general /_search API not /index/_search
+                        PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(),
+                        paginatedJsonQueryParameters.getType(),
+                        PAGINATION_SCROLL.getValue().equals(paginationType)
+                                ? Collections.singletonMap("scroll", paginatedJsonQueryParameters.getKeepAlive())
+                                : null
+                );
+                paginatedJsonQueryParameters.setPitId(response.getPitId());
+                paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter());
+            }
+            paginatedJsonQueryParameters.setScrollId(response.getScrollId());
+            if (newQuery && input != null) {
+                session.getProvenanceReporter().send(
+                        input,
+                        clientService.get().getTransitUrl(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType()),
+                        stopWatch.getElapsed(TimeUnit.MILLISECONDS)
+                );
+            }
+
+            // mark the paginated query for expiry if there are no hits (no more pages to obtain so stop looping on this query)
+            updatePageExpirationTimestamp(paginatedJsonQueryParameters, !response.getHits().isEmpty());
+
+            hitsFlowFiles = handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
+        } while (!response.getHits().isEmpty() && (input != null || FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)));
+
+        if (response.getHits().isEmpty()) {
+            getLogger().debug("No more results for paginated query, clearing Elasticsearch resources");
+            clearElasticsearchState(context, response);
+        }
+
+        return response;
+    }
+
+    @Override
+    PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
+        final PaginatedJsonQueryParameters paginatedJsonQueryParameters = new PaginatedJsonQueryParameters();
+        populateCommonJsonQueryParameters(paginatedJsonQueryParameters, input, context, session);
+
+        paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS) + "s");
+
+        return paginatedJsonQueryParameters;
+    }
+
+    abstract boolean isExpired(final PaginatedJsonQueryParameters paginatedQueryParameters, final ProcessContext context,
+                               final SearchResponse response) throws IOException;
+
+    abstract String getScrollId(final ProcessContext context, final SearchResponse response) throws IOException;
+
+    abstract String getPitId(final ProcessContext context, final SearchResponse response) throws IOException;
+
+    private void prepareNextPageQuery(final ObjectNode queryJson, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
+        // prepare to get next page of results (depending on pagination type)
+        if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+            // overwrite query JSON with existing Scroll details
+            queryJson.removeAll().put("scroll_id", paginatedJsonQueryParameters.getScrollId());
+            if (StringUtils.isNotBlank(paginatedJsonQueryParameters.getKeepAlive())) {
+                queryJson.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
+            }
+        } else {
+            // add search_after to query JSON
+            queryJson.set("search_after", mapper.readValue(paginatedJsonQueryParameters.getSearchAfter(), ArrayNode.class));
+
+            // remove any request for aggregations as they were dealt with in the first page
+            if (queryJson.has("aggs")) {
+                getLogger().debug("Removing \"aggs\" from non-initial paged query");
+                queryJson.remove("aggs");
+            }
+        }
+    }
+
+    private String updateQueryJson(final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
+        final ObjectNode queryJson = mapper.readValue(paginatedJsonQueryParameters.getQuery(), ObjectNode.class);
+
+        if (!newQuery) {
+            prepareNextPageQuery(queryJson, paginatedJsonQueryParameters);
+        } else if ((PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) || PAGINATION_SEARCH_AFTER.getValue().equals(paginationType))
+                && !queryJson.has("sort")) {
+            // verify query contains a "sort" field if pit/search_after requested
+            throw new IllegalArgumentException("Query using pit/search_after must contain a \"sort\" field");
+        }
+
+        if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+            // add pit_id to query JSON
+            final String queryPitId = newQuery
+                    ? clientService.get().initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive())
+                    : paginatedJsonQueryParameters.getPitId();
+
+            final ObjectNode pit = JsonNodeFactory.instance.objectNode().put("id", queryPitId);
+            if (StringUtils.isNotBlank(paginatedJsonQueryParameters.getKeepAlive())) {
+                pit.put("keep_alive", paginatedJsonQueryParameters.getKeepAlive());
+            }
+            queryJson.set("pit", pit);
+        }
+
+        return mapper.writeValueAsString(queryJson);
+    }
+
+    private FlowFile writeCombinedHitFlowFile(final int count, final List<Map<String, Object>> hits, final ProcessSession session,
+                                              final FlowFile hitFlowFile, final Map<String, String> attributes, final boolean append) {
+        FlowFile ff;
+        if (append) {
+            // separate new from existing hits with a newline
+            ff = session.append(hitFlowFile, out -> out.write('\n'));
+            ff = session.append(ff, out -> writer.writeValues(out).writeAll(hits));
+        } else {
+            ff = session.write(hitFlowFile, out -> writer.writeValues(out).writeAll(hits));
+        }
+
+        attributes.put("hit.count", Integer.toString(count));
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void combineHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+                             final ProcessSession session, final FlowFile parent,
+                             final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles) {
+        if (hits != null && !hits.isEmpty()) {
+            final FlowFile hitFlowFile;
+            final boolean append = !hitsFlowFiles.isEmpty();
+            if (!hitsFlowFiles.isEmpty()) {
+                hitFlowFile = hitsFlowFiles.remove(0);
+            } else {
+                hitFlowFile = createChildFlowFile(session, parent);
+            }
+
+            hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
+                    hits, session, hitFlowFile, attributes, append));
+        }
+    }
+
+    /*
+     * The List<FlowFile> hitsFlowFiles parameter and return value are used in order to allow pagination of query results.
+     * The List is created in AbstractJsonQueryElasticsearch#onTrigger and passed to doQuery => handleResponse => handleHits,
+     * for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
+     * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
+     */
+    @Override
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
+                              final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
+                              final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
+        paginatedJsonQueryParameters.incrementPageCount();
+        attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
+
+        if (FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)) {
+            combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
+
+            // output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected)
+            if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) {
+                session.transfer(hitsFlowFiles, REL_HITS);
+                hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+                hitsFlowFiles.clear();
+            }
+        } else {
+            super.handleHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
+        }
+
+        return hitsFlowFiles;
+    }
+
+    private void updatePageExpirationTimestamp(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final boolean hasHits) {
+        final String keepAliveDuration = "PT" + (hasHits ? paginatedJsonQueryParameters.getKeepAlive() : "0s");
+        paginatedJsonQueryParameters.setPageExpirationTimestamp(
+                String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
+        );
+    }
+
+    void clearElasticsearchState(final ProcessContext context, final SearchResponse response) {
+        try {
+            if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+                final String scrollId = getScrollId(context, response);
+
+                if (StringUtils.isNotBlank(scrollId)) {
+                    clientService.get().deleteScroll(scrollId);
+                }
+            } else if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+                final String pitId = getPitId(context, response);
+
+                if (StringUtils.isNotBlank(pitId)) {
+                    clientService.get().deletePointInTime(pitId);
+                }
+            }
+        } catch (Exception ex) {
+            getLogger().warn("Error while cleaning up Elasticsearch pagination resources, ignoring", ex);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index a52d2ca..75f1fc3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -24,8 +24,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.JsonValidator;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.JsonValidator;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.ByteArrayOutputStream;
@@ -61,6 +61,7 @@ public interface ElasticsearchRestProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(JsonValidator.INSTANCE)
             .build();
+
     PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
             .name("el-query-attribute")
             .displayName("Query Attribute")
@@ -106,16 +107,16 @@ public interface ElasticsearchRestProcessor {
                     "configured will be sent to this relationship as part of a failed record record set.")
             .autoTerminateDefault(true).build();
 
-    default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
+    default String getQuery(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
         String retVal = null;
         if (context.getProperty(QUERY).isSet()) {
             retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
         } else if (input != null) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
             session.exportTo(input, out);
             out.close();
 
-            retVal = new String(out.toByteArray());
+            retVal = out.toString();
         }
 
         return retVal;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index 903a124..6f35037 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -16,41 +16,28 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 @WritesAttributes({
     @WritesAttribute(attribute = "mime.type", description = "application/json"),
-    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile")
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
 @EventDriven
@@ -59,208 +46,44 @@ import java.util.Set;
         "Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
         "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
         "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
-public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
-            .description("All original flowfiles that don't cause an error to occur go to this relationship. " +
-                    "This applies even if you select the \"split up hits\" option to send individual hits to the " +
-                    "\"hits\" relationship.").build();
-
-    public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
-            .description("Search hits are routed to this relationship.")
-            .build();
-
-    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations")
-            .description("Aggregations are routed to this relationship.")
-            .build();
-    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
-        "splitUp-yes",
-        "Yes",
-        "Split up results."
-    );
-    public static final AllowableValue SPLIT_UP_HITS_NO = new AllowableValue(
-        "splitUp-no",
-        "No",
-        "Don't split up results."
-    );
-
-    public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder()
-            .name("el-rest-split-up-hits")
-            .displayName("Split up search results")
-            .description("Split up search results into one flowfile per result.")
-            .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES)
-            .defaultValue(SPLIT_UP_HITS_NO.getValue())
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .build();
-    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder()
-            .name("el-rest-split-up-aggregations")
-            .displayName("Split up aggregation results")
-            .description("Split up aggregation results into one flowfile per result.")
-            .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES)
-            .defaultValue(SPLIT_UP_HITS_NO.getValue())
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .build();
-
-    private static final Set<Relationship> relationships;
-    private static final List<PropertyDescriptor> propertyDescriptors;
-
-    private volatile ElasticSearchClientService clientService;
-
-    static {
-        final Set<Relationship> _rels = new HashSet<>();
-        _rels.add(REL_ORIGINAL);
-        _rels.add(REL_FAILURE);
-        _rels.add(REL_HITS);
-        _rels.add(REL_AGGREGATIONS);
-        relationships = Collections.unmodifiableSet(_rels);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(QUERY);
-        descriptors.add(QUERY_ATTRIBUTE);
-        descriptors.add(INDEX);
-        descriptors.add(TYPE);
-        descriptors.add(CLIENT_SERVICE);
-        descriptors.add(SPLIT_UP_HITS);
-        descriptors.add(SPLIT_UP_AGGREGATIONS);
-
-        propertyDescriptors = Collections.unmodifiableList(descriptors);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
+public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<JsonQueryParameters> {
     @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
-    }
+    JsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session)
+            throws IOException {
 
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+        final JsonQueryParameters jsonQueryParameters = new JsonQueryParameters();
+        populateCommonJsonQueryParameters(jsonQueryParameters, input, context, session);
+        return jsonQueryParameters;
     }
 
-    @OnStopped
-    public void onStopped() {
-        this.clientService = null;
-    }
-
-
-    private final ObjectMapper mapper = new ObjectMapper();
-
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile input = null;
-        if (context.hasIncomingConnection()) {
-            input = session.get();
-
-            if (input == null && context.hasNonLoopConnection()) {
-                return;
-            }
+    SearchResponse doQuery(final JsonQueryParameters queryJsonParameters, final List<FlowFile> hitsFlowFiles,
+                           final ProcessSession session, final ProcessContext context, final FlowFile input,
+                           final StopWatch stopWatch) throws IOException {
+        final SearchResponse response = clientService.get().search(
+                queryJsonParameters.getQuery(),
+                queryJsonParameters.getIndex(),
+                queryJsonParameters.getType(),
+                null
+        );
+        if (input != null) {
+            session.getProvenanceReporter().send(
+                    input,
+                    clientService.get().getTransitUrl(queryJsonParameters.getIndex(), queryJsonParameters.getType()),
+                    stopWatch.getElapsed(TimeUnit.MILLISECONDS)
+            );
         }
 
-        try {
-
-            final String query = getQuery(input, context, session);
-            final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
-            final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
-            final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
-                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
-                : null;
-
-            SearchResponse response = clientService.search(query, index, type);
-
-            Map<String, String> attributes = new HashMap<>();
-            attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
-            if (!StringUtils.isBlank(queryAttr)) {
-                attributes.put(queryAttr, query);
-            }
+        handleResponse(response, true, queryJsonParameters, hitsFlowFiles, session, input, stopWatch);
 
-            List<FlowFile> hitsFlowFiles = handleHits(response.getHits(), context, session, input, attributes);
-            List<FlowFile> aggsFlowFiles = handleAggregations(response.getAggregations(), context, session, input, attributes);
-
-            final String transitUri = clientService.getTransitUrl(index, type);
-
-            if (hitsFlowFiles.size() > 0) {
-                session.transfer(hitsFlowFiles, REL_HITS);
-                for (FlowFile ff : hitsFlowFiles) {
-                    session.getProvenanceReporter().send(ff, transitUri);
-                }
-            }
-
-            if (aggsFlowFiles.size() > 0) {
-                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
-                for (FlowFile ff : aggsFlowFiles) {
-                    session.getProvenanceReporter().send(ff, transitUri);
-                }
-            }
-
-            if (input != null) {
-                session.transfer(input, REL_ORIGINAL);
-            }
-        } catch (Exception ex) {
-            getLogger().error("Error processing flowfile.", ex);
-            if (input != null) {
-                session.transfer(input, REL_FAILURE);
-            }
-            context.yield();
-        }
+        return response;
     }
 
-    private FlowFile writeAggregationFlowFileContents(String name, String json, ProcessSession session, FlowFile aggFlowFile, Map<String, String> attributes) {
-        aggFlowFile = session.write(aggFlowFile, out -> out.write(json.getBytes()));
-        if (name != null) {
-            aggFlowFile = session.putAttribute(aggFlowFile, "aggregation.name", name);
-        }
-
-        return session.putAllAttributes(aggFlowFile, attributes);
-    }
-
-    private List<FlowFile> handleAggregations(Map<String, Object> aggregations, ProcessContext context, ProcessSession session, FlowFile parent, Map<String, String> attributes) throws IOException {
-        List<FlowFile> retVal = new ArrayList<>();
-        if (aggregations == null) {
-            return retVal;
-        }
-        String splitUpValue = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
-
-        if (splitUpValue.equals(SPLIT_UP_YES.getValue())) {
-            for (Map.Entry<String, Object> agg : aggregations.entrySet()) {
-                FlowFile aggFlowFile = parent != null ? session.create(parent) : session.create();
-                String aggJson = mapper.writeValueAsString(agg.getValue());
-                retVal.add(writeAggregationFlowFileContents(agg.getKey(), aggJson, session, aggFlowFile, attributes));
-            }
-        } else {
-            String json = mapper.writeValueAsString(aggregations);
-            retVal.add(writeAggregationFlowFileContents(null, json, session, parent != null ? session.create(parent) : session.create(), attributes));
-        }
-
-        return retVal;
-    }
-
-    private FlowFile writeHitFlowFile(String json, ProcessSession session, FlowFile hitFlowFile, Map<String, String> attributes) {
-        hitFlowFile = session.write(hitFlowFile, out -> out.write(json.getBytes()));
-
-        return session.putAllAttributes(hitFlowFile, attributes);
-    }
-
-    private List<FlowFile> handleHits(List<Map<String, Object>> hits, ProcessContext context, ProcessSession session, FlowFile parent, Map<String, String> attributes) throws IOException {
-        String splitUpValue = context.getProperty(SPLIT_UP_HITS).getValue();
-        List<FlowFile> retVal = new ArrayList<>();
-        if (splitUpValue.equals(SPLIT_UP_YES.getValue())) {
-            for (Map<String, Object> hit : hits) {
-                FlowFile hitFlowFile = parent != null ? session.create(parent) : session.create();
-                String json = mapper.writeValueAsString(hit);
-
-                retVal.add(writeHitFlowFile(json, session, hitFlowFile, attributes));
-            }
-        } else {
-            FlowFile hitFlowFile = parent != null ? session.create(parent) : session.create();
-            String json = mapper.writeValueAsString(hits);
-            retVal.add(writeHitFlowFile(json, session, hitFlowFile, attributes));
+    @Override
+    void finishQuery(final FlowFile input, final JsonQueryParameters jsonQueryParameters, final ProcessSession session,
+                     final ProcessContext context, final SearchResponse response) {
+        if (input != null) {
+            session.transfer(input, REL_ORIGINAL);
         }
-
-        return retVal;
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
new file mode 100644
index 0000000..ddd12cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", "json"})
+@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.addAll(paginatedPropertyDescriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedQueryJsonParameters,
+                     final ProcessSession session, final ProcessContext context, final SearchResponse response) {
+        session.transfer(input, REL_ORIGINAL);
+    }
+
+    @Override
+    boolean isExpired(final PaginatedJsonQueryParameters paginatedQueryJsonParameters, final ProcessContext context,
+                      final SearchResponse response) {
+        // queries using input FlowFiles don't expire, they run until completion
+        return false;
+    }
+
+    @Override
+    String getScrollId(final ProcessContext context, final SearchResponse response) {
+        return response != null ? response.getScrollId() : null;
+    }
+
+    @Override
+    String getPitId(final ProcessContext context, final SearchResponse response) {
+        return response != null ? response.getPitId() : null;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
new file mode 100644
index 0000000..40976c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@PrimaryNodeOnly
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "search", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " +
+        "until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " +
+        "restart with the first page of results being retrieved.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp) " +
+        "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+    static final String STATE_SCROLL_ID = "scrollId";
+    static final String STATE_PIT_ID = "pitId";
+    static final String STATE_SEARCH_AFTER = "searchAfter";
+    static final String STATE_PAGE_EXPIRATION_TIMESTAMP = "pageExpirationTimestamp";
+    static final String STATE_PAGE_COUNT = "pageCount";
+    static final String STATE_HIT_COUNT = "hitCount";
+
+    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.QUERY)
+            .name("el-rest-query")
+            .description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}.")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SearchElasticsearch.QUERY);
+        descriptors.addAll(paginatedPropertyDescriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
+        final PaginatedJsonQueryParameters paginatedQueryJsonParameters = super.buildJsonQueryParameters(input, context, session);
+
+        final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
+        paginatedQueryJsonParameters.setHitCount(stateMap.get(STATE_HIT_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_HIT_COUNT)));
+        paginatedQueryJsonParameters.setPageCount(stateMap.get(STATE_PAGE_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_PAGE_COUNT)));
+        paginatedQueryJsonParameters.setScrollId(stateMap.get(STATE_SCROLL_ID));
+        paginatedQueryJsonParameters.setSearchAfter(stateMap.get(STATE_SEARCH_AFTER));
+        paginatedQueryJsonParameters.setPitId(stateMap.get(STATE_PIT_ID));
+        paginatedQueryJsonParameters.setPageExpirationTimestamp(stateMap.get(STATE_PAGE_EXPIRATION_TIMESTAMP));
+
+        return paginatedQueryJsonParameters;
+    }
+
+    @Override
+    void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedQueryJsonParameters,
+                     final ProcessSession session, final ProcessContext context, final SearchResponse response) throws IOException {
+        if (response.getHits().isEmpty()) {
+            getLogger().debug("No more results for paginated query, resetting local state for future queries");
+            resetProcessorState(context);
+        } else {
+            getLogger().debug("Updating local state for next execution");
+
+            final Map<String, String> newStateMap = new HashMap<>();
+            if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+                newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
+            } else {
+                newStateMap.put(STATE_SEARCH_AFTER, response.getSearchAfter());
+
+                if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+                    newStateMap.put(STATE_PIT_ID, response.getPitId());
+                }
+            }
+            newStateMap.put(STATE_HIT_COUNT, Integer.toString(paginatedQueryJsonParameters.getHitCount()));
+            newStateMap.put(STATE_PAGE_COUNT, Integer.toString(paginatedQueryJsonParameters.getPageCount()));
+            newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, paginatedQueryJsonParameters.getPageExpirationTimestamp());
+            context.getStateManager().setState(newStateMap, Scope.LOCAL);
+        }
+    }
+
+    @Override
+    boolean isExpired(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final ProcessContext context,
+                      final SearchResponse response) throws IOException {
+        final boolean expiredQuery = StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
+                && Instant.ofEpochMilli(Long.parseLong(paginatedJsonQueryParameters.getPageExpirationTimestamp())).isBefore(Instant.now());
+        if (expiredQuery) {
+            getLogger().debug("Existing paginated query has expired, resetting for new query");
+
+            resetProcessorState(context);
+
+            paginatedJsonQueryParameters.setPageCount(0);
+            paginatedJsonQueryParameters.setHitCount(0);
+            paginatedJsonQueryParameters.setPageExpirationTimestamp(null);
+            paginatedJsonQueryParameters.setPitId(null);
+            paginatedJsonQueryParameters.setScrollId(null);
+            paginatedJsonQueryParameters.setSearchAfter(null);
+        }
+        return expiredQuery;
+    }
+
+    @Override
+    String getScrollId(final ProcessContext context, final SearchResponse response) throws IOException {
+        return response == null || StringUtils.isBlank(response.getScrollId())
+                ? context.getStateManager().getState(Scope.LOCAL).get(STATE_SCROLL_ID)
+                : response.getScrollId();
+    }
+
+    @Override
+    String getPitId(final ProcessContext context, final SearchResponse response) throws IOException {
+        return response == null || StringUtils.isBlank(response.getScrollId())
+                ? context.getStateManager().getState(Scope.LOCAL).get(STATE_PIT_ID)
+                : response.getPitId();
+    }
+
+    private void resetProcessorState(final ProcessContext context) throws IOException {
+        // using ProcessContext#stateManager instead of ProcessSession#*State methods because the latter don't
+        // seem to persist things properly between sessions if the processor is scheduled to run very quickly, e.g. every second (NIFI-9050)
+        context.getStateManager().clear(Scope.LOCAL);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/JsonQueryParameters.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/JsonQueryParameters.java
new file mode 100644
index 0000000..69e81f1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/JsonQueryParameters.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+public class JsonQueryParameters {
+    private int hitCount = 0;
+    private String query;
+    private String index;
+    private String type;
+    private String queryAttr;
+
+    public String getQuery() {
+        return query;
+    }
+
+    public void setQuery(final String query) {
+        this.query = query;
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public void setIndex(final String index) {
+        this.index = index;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(final String type) {
+        this.type = type;
+    }
+
+    public String getQueryAttr() {
+        return queryAttr;
+    }
+
+    public void setQueryAttr(final String queryAttr) {
+        this.queryAttr = queryAttr;
+    }
+
+    public int getHitCount() {
+        return hitCount;
+    }
+
+    public void setHitCount(final int hitCount) {
+        this.hitCount = hitCount;
+    }
+
+    public void addHitCount(final int hitCount) {
+        this.hitCount += hitCount;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
new file mode 100644
index 0000000..8e97f21
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+public class PaginatedJsonQueryParameters extends JsonQueryParameters {
+    private int pageCount = 0;
+    private String scrollId = null;
+    private String searchAfter = null;
+    private String pitId = null;
+    private String pageExpirationTimestamp = null;
+    private String keepAlive;
+
+    public int getPageCount() {
+        return pageCount;
+    }
+
+    public void setPageCount(final int pageCount) {
+        this.pageCount = pageCount;
+    }
+
+    public void incrementPageCount() {
+        this.pageCount++;
+    }
+
+    public String getScrollId() {
+        return scrollId;
+    }
+
+    public void setScrollId(final String scrollId) {
+        this.scrollId = scrollId;
+    }
+
+    public String getSearchAfter() {
+        return searchAfter;
+    }
+
+    public void setSearchAfter(final String searchAfter) {
+        this.searchAfter = searchAfter;
+    }
+
+    public String getPitId() {
+        return pitId;
+    }
+
+    public void setPitId(final String pitId) {
+        this.pitId = pitId;
+    }
+
+    public String getPageExpirationTimestamp() {
+        return pageExpirationTimestamp;
+    }
+
+    public void setPageExpirationTimestamp(final String pageExpirationTimestamp) {
+        this.pageExpirationTimestamp = pageExpirationTimestamp;
+    }
+
+    public String getKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(final String keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
deleted file mode 100644
index 2034d64..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
+++ /dev/null
@@ -1,48 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>PutElasticsearchJson</title>
-    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
-</head>
-<body>
-    <p>This processor is designed to take any valid JSON and index it into Elasticsearch 5.x or newer. Unlike the record-aware
-    alternative, it only does index operations, not updates, upserts or deletes.</p>
-    <p>The ID for each document can be set in one of three ways:</p>
-    <ul>
-        <li>FlowFile attribute - only possible if the flowfile contains only a single document. Arrays of documents
-        will cause an error to be raised.</li>
-        <li>JsonPath operation - searches within the document for a particular field.</li>
-        <li>Default - Elasticsearch will generate one. This will be used when neither the attribute nor JsonPath configuration
-        fields are used.</li>
-    </ul>
-    <p>Example document:</p>
-    <pre>
-        {
-            "index": "messages",
-            "type": "message",
-            "message": "Hello, world",
-            "from": "john.smith"
-        }
-    </pre>
-    <p>The following JsonPath operations will extract the index and type:</p>
-    <ul>
-        <li>$.index</li>
-        <li>$.type</li>
-    </ul>
-</body>
-</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
deleted file mode 100644
index 9206785..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
+++ /dev/null
@@ -1,64 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>PutElasticsearchRecord</title>
-    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
-</head>
-<body>
-    <p>This is a record-aware processor built for Elasticsearch 5 and later. The index and type fields are configured
-    with default values that can be overridden using record path operations that find an index or type value in the
-    record set. The ID and operation type (index, update, upsert or delete) can also be extracted in a similar fashion from
-    the record set. The following is an example of a document exercising all of these features:</p>
-    <pre>
-        {
-            "metadata": {
-                "id": "12345",
-                "index": "test",
-                "type": "message",
-                "operation": "index"
-            },
-            "message": "Hello, world",
-            "from": "john.smith"
-        }
-    </pre>
-    <pre>
-        {
-            "metadata": {
-                "id": "12345",
-                "index": "test",
-                "type": "message",
-                "operation": "delete"
-            }
-        }
-    </pre>
-    <p>The record path operations below would extract the relevant data:</p>
-    <ul>
-        <li>/metadata/id</li>
-        <li>/metadata/index</li>
-        <li>metadata/type</li>
-        <li>metadata/operation</li>
-    </ul>
-    <p>Valid values for "operation" are:</p>
-    <ul>
-        <li>delete</li>
-        <li>index</li>
-        <li>update</li>
-        <li>upsert</li>
-    </ul>
-</body>
-</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 84e00b4..eafd001 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,4 +15,6 @@
 
 org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.SearchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
index 52f5f3d..e9330ea 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
@@ -21,11 +21,10 @@
 </head>
 <body>
     <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
-    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
+    to be able to take a JSON query (e.g. from Kibana) and execute it as-is against an Elasticsearch cluster. Like all processors in the
     "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
-    <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
-    the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute
-    property.</p>
+    <p>The query JSON to execute can be provided either in the Query configuration property or in the content of the flowfile.
+    If the Query Attribute property is configured, the executed query JSON will be placed in the attribute provided by this property.</p>
     <p>Additionally, search results and aggregation results can be split up into multiple flowfiles. Aggregation results
     will only be split at the top level because nested aggregations lose their context (and thus lose their value) if
     separated from their parent aggregation. The following is an example query that would be accepted:</p>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
similarity index 55%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
index 52f5f3d..1276325 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
@@ -16,22 +16,29 @@
 -->
 <head>
     <meta charset="utf-8" />
-    <title>JsonQueryElasticsearch</title>
+    <title>PaginatedJsonQueryElasticsearch</title>
     <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
 </head>
 <body>
     <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
-    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
-    "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
-    <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
-    the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute
-    property.</p>
-    <p>Additionally, search results and aggregation results can be split up into multiple flowfiles. Aggregation results
+    to be able to take a JSON query (e.g. from Kibana) and execute it as-is against an Elasticsearch cluster in a paginated manner.
+    Like all processors in the "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
+    <p>The query JSON to execute can be provided either in the Query configuration property or in the content of the flowfile.
+    If the Query Attribute property is configured, the executed query JSON will be placed in the attribute provided by this property.</p>
+    <p>The query is paginated in Elasticsearch using one of the available methods - "Scroll" or "Search After" (optionally
+    with a "Point in Time" for Elasticsearch 7.10+ with XPack enabled). The number of results per page can be controlled using
+        the <em>size</em> parameter in the Query JSON. For Search After functionality, a <em>sort</em> parameter <strong>must</strong>
+    be present within the Query JSON.</p>
+    <p>Search results and aggregation results can be split up into multiple flowfiles. Aggregation results
     will only be split at the top level because nested aggregations lose their context (and thus lose their value) if
-    separated from their parent aggregation. The following is an example query that would be accepted:</p>
+    separated from their parent aggregation. Additionally, the results from all pages can be combined into a single
+    flowfile (but the processor will only load each page of data into memory at any one time).</p>
+    <p>The following is an example query that would be accepted:</p>
     <pre>
         {
             "query": {
+                "size": 10000,
+                "sort": {"product": "desc"},
                 "match": {
                     "restaurant.keyword": "Local Pizzaz FTW Inc"
                 }
@@ -54,5 +61,6 @@
             }
         }
     </pre>
+
 </body>
 </html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html
new file mode 100644
index 0000000..d4a0530
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>SearchElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a JSON query (e.g. from Kibana) and execute it as-is against an Elasticsearch cluster in a paginated manner.
+    Like all processors in the "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
+    <p>The query to execute must be provided in the Query configuration property.</p>
+    <p>The query is paginated in Elasticsearch using one of the available methods - "Scroll" or "Search After" (optionally
+    with a "Point in Time" for Elasticsearch 7.10+ with XPack enabled). The number of results per page can be controlled using
+    the <em>size</em> parameter in the Query JSON. For Search After functionality, a <em>sort</em> parameter <strong>must</strong>
+    be present within the Query JSON.</p>
+    <p>Search results and aggregation results can be split up into multiple flowfiles. Aggregation results
+    will only be split at the top level because nested aggregations lose their context (and thus lose their value) if
+    separated from their parent aggregation. Additionally, the results from all pages can be combined into a single
+    flowfile (but the processor will only load each page of data into memory at any one time).</p>
+    <p>The following is an example query that would be accepted:</p>
+    <pre>
+        {
+            "query": {
+                "size": 10000,
+                "sort": {"product": "desc"},
+                "match": {
+                    "restaurant.keyword": "Local Pizzaz FTW Inc"
+                }
+            },
+            "aggs": {
+                "weekly_sales": {
+                  "date_histogram": {
+                    "field": "date",
+                    "interval": "week"
+                  },
+                  "aggs": {
+                    "items": {
+                      "terms": {
+                        "field": "product",
+                        "size": 10
+                      }
+                    }
+                  }
+                }
+            }
+        }
+    </pre>
+    <p>This processor runs on a schedule in order to execute the same query repeatedly. Once a paginated query has been
+    initiated within Elasticsearch, this processor will continue to retrieve results for that same query until no
+    further results are available. After that point, a new paginated query will be initiated using the same Query JSON.
+    This processor does not attempt to de-duplicate results between queries, for example if the same query runs twice
+    and (some or all of) the results are identical, the output will contain these same results for both invocations.</p>
+    <p>If the results are "Combined" from this processor, then the paginated query will run continually within a
+    single invocation until no more results are available (then the processor will start a new paginated query upon its
+    next invocation). If the results are "Split" or "Per Page", then each invocation of this processor will retrieve the
+    next page of results until either there are no more results or the paginated query expires within Elasticsearch.</p>
+    <p>Local State is used to track the progress of a paginated query within this processor. If there is need to restart
+    the query completely or change the processor configuration after a paginated query has already been started,
+    be sure to "Clear State" of the processor once it has been stopped and before restarting.</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
new file mode 100644
index 0000000..b114eb4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.components.state.Scope
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.hamcrest.CoreMatchers.is
+import static org.junit.Assert.assertThrows
+
+abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
+    static final String INDEX_NAME = "messages"
+
+    abstract P getProcessor()
+
+    abstract boolean isStateUsed()
+
+    abstract boolean isInput()
+
+    @Test
+    void testMandatoryProperties() {
+        final TestRunner runner = createRunner(false)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.CLIENT_SERVICE)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.INDEX)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.TYPE)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.QUERY)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT)
+        runner.removeProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
+
+        final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
+        if (processor instanceof SearchElasticsearch) {
+            assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 3 validation failures:\n" +
+                    "'%s' is invalid because %s is required\n" +
+                    "'%s' is invalid because %s is required\n" +
+                    "'%s' is invalid because %s is required\n",
+                    AbstractJsonQueryElasticsearch.QUERY.getDisplayName(), AbstractJsonQueryElasticsearch.QUERY.getDisplayName(),
+                    AbstractJsonQueryElasticsearch.INDEX.getDisplayName(), AbstractJsonQueryElasticsearch.INDEX.getDisplayName(),
+                    AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(), AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
+            )))
+        } else {
+            assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 2 validation failures:\n" +
+                    "'%s' is invalid because %s is required\n" +
+                    "'%s' is invalid because %s is required\n",
+                    AbstractJsonQueryElasticsearch.INDEX.getDisplayName(), AbstractJsonQueryElasticsearch.INDEX.getDisplayName(),
+                    AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(), AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
+            )))
+        }
+    }
+
+    @Test
+    void testInvalidProperties() {
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.CLIENT_SERVICE, "not-a-service")
+        runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "")
+        runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "")
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, "not-json")
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, "not-enum")
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, "not-enum2")
+
+        final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
+            ? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
+            : [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
+
+        final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
+        assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 7 validation failures:\n" +
+                "'%s' validated against 'not-json' is invalid because %s is not a valid JSON representation due to Unrecognized token 'not': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" +
+                " at [Source: (String)\"not-json\"; line: 1, column: 4]\n" +
+                "'%s' validated against '' is invalid because %s cannot be empty\n" +
+                "'%s' validated against '' is invalid because %s cannot be empty\n" +
+                "'%s' validated against 'not-a-service' is invalid because Property references a Controller Service that does not exist\n" +
+                "'%s' validated against 'not-enum2' is invalid because Given value not found in allowed set '%s'\n" +
+                "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
+                "'%s' validated against 'not-a-service' is invalid because Invalid Controller Service: not-a-service is not a valid Controller Service Identifier\n",
+                AbstractJsonQueryElasticsearch.QUERY.getName(), AbstractJsonQueryElasticsearch.QUERY.getName(),
+                AbstractJsonQueryElasticsearch.INDEX.getName(), AbstractJsonQueryElasticsearch.INDEX.getName(),
+                AbstractJsonQueryElasticsearch.TYPE.getName(), AbstractJsonQueryElasticsearch.TYPE.getName(),
+                AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
+                AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
+                AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
+                AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
+        )))
+    }
+
+    @Test
+    void testBasicQuery() throws Exception {
+        // test hits (no splitting)
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+        final FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
+        hits.assertAttributeEquals("hit.count", "10")
+        assertOutputContent(hits.getContent(), 10, false)
+        assertThat(
+                runner.getProvenanceEvents().stream().filter({ pe ->
+                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                            pe.getAttribute("uuid") == hits.getAttribute("uuid")
+                }).count(),
+                is(1L)
+        )
+        reset(runner)
+
+
+        // test splitting hits
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "1")
+                    assertOutputContent(hit.getContent(), 1, false)
+                    assertThat(
+                            runner.getProvenanceEvents().stream().filter({ pe ->
+                                pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                        pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                            }).count(),
+                            is(1L)
+                    )
+                }
+        )
+    }
+
+    @Test
+    void testAggregations() throws Exception {
+        String query = prettyPrint(toJson([
+                query: [ match_all: [:] ],
+                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
+        ]))
+
+        // test aggregations (no splitting)
+        final TestRunner runner = createRunner(true)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 1)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        FlowFile aggregations = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0)
+        aggregations.assertAttributeNotExists("aggregation.number")
+        aggregations.assertAttributeNotExists("aggregation.name")
+        // count == 1 because aggregations is a single Map rather than a List of Maps, even when there are multiple aggs
+        assertOutputContent(aggregations.getContent(), 1, false)
+        reset(runner)
+
+
+        // test with the query parameter and no incoming connection
+        runner.setIncomingConnection(false)
+        runner.run(1, true, true)
+        testCounts(runner, 0, 1, 0, 1)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.number")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.name")
+        reset(runner)
+
+
+        // test splitting aggregations
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        int a = 0
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
+                { agg ->
+                    agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+                    agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+                    assertOutputContent(agg.getContent(), 1, false)
+                }
+        )
+        reset(runner)
+
+
+        // test using Expression Language (index, type, query)
+        query = prettyPrint(toJson([
+                query: [ match_all: [:] ],
+                aggs: [ term_agg: [ terms: [ field: "\${fieldValue}" ] ], term_agg2: [ terms: [ field: "\${fieldValue}" ] ] ]
+        ]))
+        runner.setVariable("fieldValue", "msg")
+        runner.setVariable("es.index", INDEX_NAME)
+        runner.setVariable("es.type", "msg")
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+        runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "\${es.index}")
+        runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "\${es.type}")
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        a = 0
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
+                { agg ->
+                    agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+                    agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+                    assertOutputContent(agg.getContent(), 1, false)
+                }
+        )
+    }
+
+    @Test
+    void testErrorDuringSearch() throws Exception {
+        String query = prettyPrint(toJson([
+                query: [ match_all: [:] ],
+                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
+        ]))
+
+        final TestRunner runner = createRunner(true)
+        getService(runner).setThrowErrorInSearch(true)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+        runOnce(runner)
+        testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
+    }
+
+    @Test
+    void testQueryAttribute() throws Exception {
+        String query = prettyPrint(toJson([
+                query: [ match_all: [:] ],
+                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
+        ]))
+        final String queryAttr = "es.query"
+
+        final TestRunner runner = createRunner(true)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 1)
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS)
+        flowFiles.addAll(runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS))
+
+        for (final MockFlowFile mockFlowFile : flowFiles) {
+            final String attr = mockFlowFile.getAttribute(queryAttr)
+            Assert.assertNotNull("Missing query attribute", attr)
+            Assert.assertEquals("Query had wrong value.", query, attr)
+        }
+    }
+
+    @Test
+    void testInputHandling() {
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+
+        runner.setIncomingConnection(true)
+        runner.run()
+        testCounts(runner, 0, 0, 0, 0)
+        reset(runner)
+
+        runner.setIncomingConnection(false)
+        runner.run()
+        testCounts(runner, 0, 1, 0, 0)
+    }
+
+    static void testCounts(TestRunner runner, int original, int hits, int failure, int aggregations) {
+        runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_ORIGINAL, original)
+        runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_HITS, hits)
+        runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_FAILURE, failure)
+        runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
+        runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_RETRY, 0)
+    }
+
+    static void assertOutputContent(final String content, final int count, final boolean ndjson) {
+        if (ndjson) {
+            assertThat(content.split("\n").length, is(count))
+        } else {
+            if (count == 1) {
+                assertThat(content.startsWith("{") && content.endsWith("}"), is(true))
+            } else {
+                assertThat(content.startsWith("[") && content.endsWith("]"), is(true))
+            }
+        }
+    }
+
+    TestRunner createRunner(final boolean returnAggs) {
+        final P processor = getProcessor()
+        final TestRunner runner = TestRunners.newTestRunner(processor)
+        final TestElasticsearchClientService service = new TestElasticsearchClientService(returnAggs)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(AbstractJsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+
+        return runner
+    }
+
+    MockFlowFile runOnce(final TestRunner runner) {
+        final MockFlowFile ff
+        if (isInput()) {
+            runner.setIncomingConnection(true)
+            ff = runner.enqueue("test")
+        } else {
+            runner.setIncomingConnection(false)
+            ff = null
+        }
+
+        runner.run(1, true, true)
+        return ff
+    }
+
+    static TestElasticsearchClientService getService(final TestRunner runner) {
+        return runner.getControllerService("esService", TestElasticsearchClientService.class)
+    }
+
+    void reset(final TestRunner runner) {
+        runner.clearProvenanceEvents()
+        runner.clearTransferState()
+        if (isStateUsed()) {
+            runner.getStateManager().clear(Scope.LOCAL)
+        }
+
+        getService(runner).resetPageCount()
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
new file mode 100644
index 0000000..afffcb2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.components.AllowableValue
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.junit.Test
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQueryElasticsearchTest<AbstractPaginatedJsonQueryElasticsearch> {
+    abstract boolean isInput()
+
+    @Test
+    void testInvalidPaginationProperties() {
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "not-a-period")
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, "not-enum")
+
+        final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
+        assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 2 validation failures:\n" +
+                "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
+                "'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> " +
+                "is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
+                AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
+                [
+                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL,
+                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER,
+                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME
+                ].join(", "),
+                AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
+                AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
+        )))
+    }
+
+    @Test
+    void testSinglePage() {
+        // paged query hits (no splitting)
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+        MockFlowFile input = runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+        FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
+        hits.assertAttributeEquals("hit.count", "10")
+        hits.assertAttributeEquals("page.number", "1")
+        assertOutputContent(hits.getContent(), 10, false)
+        assertThat(
+                runner.getProvenanceEvents().stream().filter({ pe ->
+                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                            pe.getAttribute("uuid") == runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getAttribute("uuid")
+                }).count(),
+                is(1L)
+        )
+        assertSendEvent(runner, input)
+        reset(runner)
+
+
+        // paged query hits splitting
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        input = runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "1")
+                    hit.assertAttributeEquals("page.number", "1")
+                    assertOutputContent(hit.getContent(), 1, false)
+                    assertThat(
+                            runner.getProvenanceEvents().stream().filter({ pe ->
+                                pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                        pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                            }).count(),
+                            is(1L)
+                    )
+                }
+        )
+        assertSendEvent(runner, input)
+        reset(runner)
+
+
+        // paged query hits combined
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        input = runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+        hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
+        hits.assertAttributeEquals("hit.count", "10")
+        hits.assertAttributeEquals("page.number", "1")
+        assertOutputContent(hits.getContent(), 10, true)
+        assertThat(
+                runner.getProvenanceEvents().stream().filter({ pe ->
+                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                            pe.getAttribute("uuid") == hits.getAttribute("uuid")
+                }).count(),
+                is(1L)
+        )
+        assertSendEvent(runner, input)
+    }
+
+    @Test
+    void testScrollError() {
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setThrowErrorInDelete(true)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+
+        // still expect "success" output for exception during final clean-up
+        runMultiple(runner, 2)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+        // check error was caught and logged
+        assertThat(runner.getLogger().getWarnMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error while cleaning up Elasticsearch pagination resources") &&
+                            logMessage.getMsg().contains("Simulated IOException - deleteScroll")
+                }),
+                is(true)
+        )
+    }
+
+    @Test
+    void testDeletePitError() {
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setThrowErrorInDelete(true)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+
+        // still expect "success" output for exception during final clean-up
+        runMultiple(runner, 2)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+        // check error was caught and logged
+        assertThat(runner.getLogger().getWarnMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error while cleaning up Elasticsearch pagination resources") &&
+                            logMessage.getMsg().contains("Simulated IOException - deletePointInTime")
+                }),
+                is(true)
+        )
+    }
+
+    @Test
+    void testInitialisePitError() {
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setThrowErrorInPit(true)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+
+        // expect "failure" output for exception during query setup
+        runOnce(runner)
+        testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
+
+        // check error was caught and logged
+        assertThat(runner.getLogger().getErrorMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error processing flowfile") &&
+                            logMessage.getThrowable().getMessage() == "Simulated IOException - initialisePointInTime"
+                }),
+                is(true)
+        )
+    }
+
+    @Test
+    void testQuerySortError() {
+        // test PiT without sort
+        final TestRunner runner = createRunner(false)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+
+        // expect "failure" output for exception during query setup
+        runOnce(runner)
+        testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
+
+        // check error was caught and logged
+        assertThat(runner.getLogger().getErrorMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error processing flowfile") &&
+                            logMessage.getThrowable().getMessage() == "Query using pit/search_after must contain a \"sort\" field"
+                }),
+                is(true)
+        )
+        reset(runner)
+
+
+        // test search_after without sort
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+        runOnce(runner)
+        testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
+        assertThat(runner.getLogger().getErrorMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error processing flowfile") &&
+                            logMessage.getThrowable().getMessage() == "Query using pit/search_after must contain a \"sort\" field"
+                }),
+                is(true)
+        )
+        reset(runner)
+
+
+        // test scroll without sort (should succeed)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runMultiple(runner, 2)
+        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+    }
+
+    @Test
+    void testScroll() {
+        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+    }
+
+    @Test
+    void testPit() {
+        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+    }
+
+    @Test
+    void testSearchAfter() {
+        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+    }
+
+    abstract void testPagination(final AllowableValue paginationType)
+
+    private void runMultiple(final TestRunner runner, final int maxIterations) {
+        if (isInput()) {
+            // with an input FlowFile, the processor still only triggers a single time and completes all processing
+            runOnce(runner)
+        } else {
+            // with no input, the processor executes multiple times and tracks progress using STATE.Local
+            runner.setIncomingConnection(false)
+            runner.run(maxIterations, true, true)
+        }
+    }
+
+    private void assertSendEvent(final TestRunner runner, final MockFlowFile input) {
+        if (isInput()) {
+            assertThat(
+                    runner.getProvenanceEvents().stream().filter({ pe ->
+                        pe.getEventType() == ProvenanceEventType.SEND &&
+                                pe.getAttribute("uuid") == input.getAttribute("uuid")
+                    }).count(),
+                    is(1L)
+            )
+        } else {
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
index 8e913ff..721b8b5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
@@ -38,7 +38,7 @@ class DeleteByQueryElasticsearchTest {
         runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
     }
 
-    private void postTest(TestRunner runner, String queryParam) {
+    private static void postTest(TestRunner runner, String queryParam) {
         runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
         runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchNoInputTest.groovy
similarity index 70%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchNoInputTest.groovy
index 1fd83d7..3268884 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchNoInputTest.groovy
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch
 
-public class DeleteOperationResponse {
-    private long took;
-
-    public DeleteOperationResponse(long took) {
-        this.took = took;
-    }
-
-    public long getTook() {
-        return took;
+class JsonQueryElasticsearchNoInputTest extends JsonQueryElasticsearchTest {
+    @Override
+    boolean isInput() {
+        return false
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
index 33a26de..4815132 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
@@ -17,236 +17,19 @@
 
 package org.apache.nifi.processors.elasticsearch
 
-import org.apache.nifi.util.MockFlowFile
-import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
-import org.junit.Assert
-import org.junit.Test
-
-import java.util.List
-
-class JsonQueryElasticsearchTest {
-    private static final String INDEX_NAME = "messages"
-
-    void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success)
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits)
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure)
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
-    }
-
-    @Test
-    void testBasicQuery() throws Exception {
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
-        TestRunner runner = TestRunners.newTestRunner(processor)
-        TestElasticsearchClientService service = new TestElasticsearchClientService(false)
-        runner.addControllerService("esService", service)
-        runner.enableControllerService(service)
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
-        runner.setValidateExpressionUsage(true)
-        runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
-
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 1, 0, 0)
-
-        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES)
-        runner.clearProvenanceEvents()
-        runner.clearTransferState()
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 10, 0, 0)
-    }
-
-    @Test
-    void testAggregations() throws Exception {
-        String query = "{\n" +
-                "\t\"query\": {\n" +
-                "\t\t\"match_all\": {}\n" +
-                "\t},\n" +
-                "\t\"aggs\": {\n" +
-                "\t\t\"test_agg\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t},\n" +
-                "\t\t\"test_agg2\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t}\n" +
-                "\t}\n" +
-                "}"
-
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
-        TestRunner runner = TestRunners.newTestRunner(processor)
-        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
-        runner.addControllerService("esService", service)
-        runner.enableControllerService(service)
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
-        runner.setValidateExpressionUsage(true)
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
-
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 1, 0, 1)
-
-        runner.clearTransferState()
-
-        //Test with the query parameter and no incoming connection
-        runner.setIncomingConnection(false)
-        runner.run(1, true, true)
-        testCounts(runner, 0, 1, 0, 1)
-        runner.setIncomingConnection(true)
-
-
-        runner.clearTransferState()
-        runner.clearProvenanceEvents()
-        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES)
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 1, 0, 2)
-
-        runner.clearProvenanceEvents()
-        runner.clearTransferState()
-
-        query = "{\n" +
-                "\t\"query\": {\n" +
-                "\t\t\"match_all\": {}\n" +
-                "\t},\n" +
-                "\t\"aggs\": {\n" +
-                "\t\t\"test_agg\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
-                "\t\t\t}\n" +
-                "\t\t},\n" +
-                "\t\t\"test_agg2\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
-                "\t\t\t}\n" +
-                "\t\t}\n" +
-                "\t}\n" +
-                "}"
-        runner.setVariable("fieldValue", "msg")
-        runner.setVariable("es.index", INDEX_NAME)
-        runner.setVariable("es.type", "msg")
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
-        runner.setProperty(JsonQueryElasticsearch.INDEX, "\${es.index}")
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "\${es.type}")
-        runner.setValidateExpressionUsage(true)
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 1, 0, 2)
+class JsonQueryElasticsearchTest extends AbstractJsonQueryElasticsearchTest<AbstractJsonQueryElasticsearch> {
+    @Override
+    AbstractJsonQueryElasticsearch getProcessor() {
+        return new JsonQueryElasticsearch()
     }
 
-    @Test
-    void testErrorDuringSearch() throws Exception {
-        String query = "{\n" +
-                "\t\"query\": {\n" +
-                "\t\t\"match_all\": {}\n" +
-                "\t},\n" +
-                "\t\"aggs\": {\n" +
-                "\t\t\"test_agg\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t},\n" +
-                "\t\t\"test_agg2\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t}\n" +
-                "\t}\n" +
-                "}"
-
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
-        TestRunner runner = TestRunners.newTestRunner(processor)
-        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
-        service.setThrowErrorInSearch(true)
-        runner.addControllerService("esService", service)
-        runner.enableControllerService(service)
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
-        runner.setValidateExpressionUsage(true)
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
-
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 0, 0, 1, 0)
+    @Override
+    boolean isStateUsed() {
+        return false
     }
 
-    @Test
-    void testQueryAttribute() throws Exception {
-        final String query = "{\n" +
-                "\t\"query\": {\n" +
-                "\t\t\"match_all\": {}\n" +
-                "\t},\n" +
-                "\t\"aggs\": {\n" +
-                "\t\t\"test_agg\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t},\n" +
-                "\t\t\"test_agg2\": {\n" +
-                "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"msg\"\n" +
-                "\t\t\t}\n" +
-                "\t\t}\n" +
-                "\t}\n" +
-                "}"
-        final String queryAttr = "es.query"
-
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
-        TestRunner runner = TestRunners.newTestRunner(processor)
-        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
-        runner.addControllerService("esService", service)
-        runner.enableControllerService(service)
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
-        runner.setValidateExpressionUsage(true)
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
-        runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
-
-        runner.enqueue("test")
-        runner.run(1, true, true)
-        testCounts(runner, 1, 1, 0, 1)
-        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS)
-        flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS))
-
-        for (MockFlowFile mockFlowFile : flowFiles) {
-            String attr = mockFlowFile.getAttribute(queryAttr)
-            Assert.assertNotNull("Missing query attribute", attr)
-            Assert.assertEquals("Query had wrong value.", query, attr)
-        }
-    }
-
-    @Test
-    void testInputHandling() {
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
-        TestRunner runner = TestRunners.newTestRunner(processor)
-        TestElasticsearchClientService service = new TestElasticsearchClientService(false)
-        runner.addControllerService("esService", service)
-        runner.enableControllerService(service)
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
-        runner.setValidateExpressionUsage(true)
-        runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
-
-        runner.run()
-
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_SUCCESS, 0)
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, 0)
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_RETRY, 0)
+    @Override
+    boolean isInput() {
+        return true
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
new file mode 100644
index 0000000..3b2d4fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.components.AllowableValue
+import org.apache.nifi.util.TestRunner
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+
+class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTest {
+    AbstractPaginatedJsonQueryElasticsearch getProcessor() {
+        return new PaginatedJsonQueryElasticsearch()
+    }
+
+    boolean isStateUsed() {
+        return false
+    }
+
+    boolean isInput() {
+        return true
+    }
+
+    void testPagination(final AllowableValue paginationType) {
+        // test flowfile per page
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setMaxPages(2)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
+
+        runOnce(runner)
+        testCounts(runner, 1, 2, 0, 0)
+        int page = 1
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "10")
+                    hit.assertAttributeEquals("page.number", Integer.toString(page++))
+                }
+        )
+        runner.getStateManager().assertStateNotSet()
+        reset(runner)
+
+
+        // test hits splitting
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runOnce(runner)
+        testCounts(runner, 1, 20, 0, 0)
+        int count = 0
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "1")
+                    // 10 hits per page, so first 10 flowfiles should be page.number 1, the rest page.number 2
+                    hit.assertAttributeEquals("page.number", Integer.toString(Math.ceil(++count / 10) as int))
+                }
+        )
+        runner.getStateManager().assertStateNotSet()
+        reset(runner)
+
+
+        // test hits combined
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runOnce(runner)
+        testCounts(runner, 1, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
+        // the "last" page.number is used, so 2 here because there were 2 pages of hits
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
+        assertThat(
+                runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
+                is(20)
+        )
+        runner.getStateManager().assertStateNotSet()
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
new file mode 100644
index 0000000..68b1463
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.components.AllowableValue
+import org.apache.nifi.components.state.Scope
+import org.apache.nifi.state.MockStateManager
+import org.apache.nifi.util.TestRunner
+import org.junit.Test
+
+import java.time.Instant
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.hamcrest.CoreMatchers.is
+import static org.junit.Assert.fail
+
+class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTest {
+    AbstractPaginatedJsonQueryElasticsearch getProcessor() {
+        return new SearchElasticsearch()
+    }
+
+    boolean isStateUsed() {
+        return true
+    }
+
+    boolean isInput() {
+        return false
+    }
+
+    @Test
+    void testScrollError() {
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setMaxPages(2)
+        service.setThrowErrorInSearch(false)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
+
+        // initialise search
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.clearTransferState()
+
+        // scroll (error)
+        service.setThrowErrorInSearch(true)
+        runOnce(runner)
+        testCounts(runner, 0, 0, 0, 0)
+        assertThat(runner.getLogger().getErrorMessages().stream()
+                .anyMatch({ logMessage ->
+                    logMessage.getMsg().contains("Error processing flowfile") &&
+                            logMessage.getThrowable().getMessage() == "Simulated IOException - scroll"
+                }),
+                is(true)
+        )
+    }
+
+    @Test
+    void testScrollExpiration() {
+        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+    }
+
+    @Test
+    void testPitExpiration() {
+        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+    }
+
+    @Test
+    void testSearchAfterExpiration() {
+        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+    }
+
+    private void testPaginationExpiration(final AllowableValue paginationType) {
+        // test flowfile per page
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setMaxPages(2)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "1 sec")
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
+
+        // first page
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        assertState(runner.getStateManager(), paginationType, 10, 1)
+
+        // wait for expiration
+        final Instant expiration = Instant.ofEpochMilli(Long.parseLong(runner.getStateManager().getState(Scope.LOCAL).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)))
+        while (expiration.isAfter(Instant.now())) {
+            Thread.sleep(10)
+        }
+        service.resetPageCount()
+        runner.clearTransferState()
+
+        // first page again (new query after first query expired)
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        assertState(runner.getStateManager(), paginationType, 10, 1)
+        runner.clearTransferState()
+
+        // second page
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
+        assertState(runner.getStateManager(), paginationType, 20, 2)
+        runner.clearTransferState()
+    }
+
+    void testPagination(final AllowableValue paginationType) {
+        // test flowfile per page
+        final TestRunner runner = createRunner(false)
+        final TestElasticsearchClientService service = getService(runner)
+        service.setMaxPages(2)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
+
+        // first page
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
+        assertState(runner.getStateManager(), paginationType, 10, 1)
+        runner.clearTransferState()
+
+        // second page
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
+        assertState(runner.getStateManager(), paginationType, 20, 2)
+        runner.clearTransferState()
+
+        // third page - no hits
+        runOnce(runner)
+        testCounts(runner, 0, 0, 0, 0)
+        assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
+        reset(runner)
+
+
+        // test hits splitting
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+
+        // first page
+        runOnce(runner)
+        testCounts(runner, 0, 10, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "1")
+                    hit.assertAttributeEquals("page.number", "1")
+                }
+        )
+        assertState(runner.getStateManager(), paginationType, 10, 1)
+        runner.clearTransferState()
+
+        // second page
+        runOnce(runner)
+        testCounts(runner, 0, 10, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                { hit ->
+                    hit.assertAttributeEquals("hit.count", "1")
+                    hit.assertAttributeEquals("page.number", "2")
+                }
+        )
+        assertState(runner.getStateManager(), paginationType, 20, 2)
+        runner.clearTransferState()
+
+        // third page - no hits
+        runOnce(runner)
+        testCounts(runner, 0, 0, 0, 0)
+        assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
+        reset(runner)
+
+
+        // test hits combined
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        // hits are combined from all pages within a single trigger of the processor
+        runOnce(runner)
+        testCounts(runner, 0, 1, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
+        // the "last" page.number is used, so 2 here because there were 2 pages of hits
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
+        assertThat(
+                runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
+                is(20)
+        )
+        assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
+    }
+
+    private static void assertState(final MockStateManager stateManager, final AllowableValue paginationType,
+                                    final int hitCount, final int pageCount) {
+        stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT, Integer.toString(hitCount), Scope.LOCAL)
+        stateManager.assertStateEquals(SearchElasticsearch.STATE_PAGE_COUNT, Integer.toString(pageCount), Scope.LOCAL)
+
+        final String pageExpirationTimestamp = stateManager.getState(Scope.LOCAL).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)
+        assertThat(Long.parseLong(pageExpirationTimestamp) > Instant.now().toEpochMilli(), is(true))
+
+        switch (paginationType) {
+            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL:
+                stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-${pageCount}", Scope.LOCAL)
+                stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
+                stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, Scope.LOCAL)
+                break
+            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME:
+                stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
+                stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-${pageCount}", Scope.LOCAL)
+                stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
+                break
+            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER:
+                stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
+                stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
+                stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
+                break
+            default:
+                fail("Unknown paginationType: ${paginationType}")
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index b1d559b..e4e5c7e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -29,6 +29,10 @@ class TestElasticsearchClientService extends AbstractControllerService implement
     private boolean returnAggs
     private boolean throwErrorInSearch
     private boolean throwErrorInDelete
+    private boolean throwErrorInPit
+    private int pageCount = 0
+    private int maxPages = 1
+    private String query
 
     TestElasticsearchClientService(boolean returnAggs) {
         this.returnAggs = returnAggs
@@ -36,12 +40,12 @@ class TestElasticsearchClientService extends AbstractControllerService implement
 
     @Override
     IndexOperationResponse add(IndexOperationRequest operation) {
-        return add(Arrays.asList(operation))
+        return bulk(Arrays.asList(operation) as List<IndexOperationRequest>)
     }
 
     @Override
     IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
-        return new IndexOperationResponse(100L, 100L)
+        return new IndexOperationResponse(100L)
     }
 
     @Override
@@ -73,25 +77,66 @@ class TestElasticsearchClientService extends AbstractControllerService implement
     }
 
     @Override
-    SearchResponse search(String query, String index, String type) {
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
         if (throwErrorInSearch) {
             throw new IOException("Simulated IOException")
         }
 
-        def mapper = new JsonSlurper()
-        def hits = mapper.parseText(HITS_RESULT)
-        def aggs = returnAggs ?  mapper.parseText(AGGS_RESULT) :  null
-        SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false)
+        this.query = query
+        final SearchResponse response
+        if (pageCount++ < maxPages) {
+            def mapper = new JsonSlurper()
+            def hits = mapper.parseText(HITS_RESULT)
+            def aggs = returnAggs && pageCount == 1 ? mapper.parseText(AGGS_RESULT) :  null
+            response = new SearchResponse((hits as List<Map<String, Object>>), aggs as Map<String, Object>, "pitId-${pageCount}", "scrollId-${pageCount}", "[\"searchAfter-${pageCount}\"]", 15, 5, false, null)
+        } else {
+            response = new SearchResponse([], [:], "pitId-${pageCount}", "scrollId-${pageCount}", "[\"searchAfter-${pageCount}\"]", 0, 1, false, null)
+        }
         return response
     }
 
     @Override
+    SearchResponse scroll(String scroll) {
+        if (throwErrorInSearch) {
+            throw new IOException("Simulated IOException - scroll")
+        }
+
+        return search(null, null, null, null)
+    }
+
+    @Override
+    String initialisePointInTime(String index, String keepAlive) {
+        if (throwErrorInPit) {
+            throw new IOException("Simulated IOException - initialisePointInTime")
+        }
+        pageCount = 0
+
+        return "123"
+    }
+
+    @Override
+    DeleteOperationResponse deletePointInTime(String pitId) {
+        if (throwErrorInDelete) {
+            throw new IOException("Simulated IOException - deletePointInTime")
+        }
+        return new DeleteOperationResponse(100L)
+    }
+
+    @Override
+    DeleteOperationResponse deleteScroll(String scrollId) {
+        if (throwErrorInDelete) {
+            throw new IOException("Simulated IOException - deleteScroll")
+        }
+        return new DeleteOperationResponse(100L)
+    }
+
+    @Override
     String getTransitUrl(String index, String type) {
         "http://localhost:9400/${index}/${type}"
     }
 
     private static final String AGGS_RESULT = "{\n" +
-            "    \"term_agg2\": {\n" +
+            "    \"term_agg\": {\n" +
             "      \"doc_count_error_upper_bound\": 0,\n" +
             "      \"sum_other_doc_count\": 0,\n" +
             "      \"buckets\": [\n" +
@@ -117,7 +162,7 @@ class TestElasticsearchClientService extends AbstractControllerService implement
             "        }\n" +
             "      ]\n" +
             "    },\n" +
-            "    \"term_agg\": {\n" +
+            "    \"term_agg2\": {\n" +
             "      \"doc_count_error_upper_bound\": 0,\n" +
             "      \"sum_other_doc_count\": 0,\n" +
             "      \"buckets\": [\n" +
@@ -245,4 +290,16 @@ class TestElasticsearchClientService extends AbstractControllerService implement
     void setThrowErrorInDelete(boolean throwErrorInDelete) {
         this.throwErrorInDelete = throwErrorInDelete
     }
+
+    void setThrowErrorInPit(boolean throwErrorInPit) {
+        this.throwErrorInPit = throwErrorInPit
+    }
+
+    void resetPageCount() {
+        this.pageCount = 0
+    }
+
+    void setMaxPages(int maxPages) {
+        this.maxPages = maxPages
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
index fdbdac1..33c62b2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
@@ -18,7 +18,11 @@
 package org.apache.nifi.processors.elasticsearch.mock
 
 import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.elasticsearch.*
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
 
 class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
     boolean throwRetriableError
@@ -60,7 +64,27 @@ class AbstractMockElasticsearchClient extends AbstractControllerService implemen
     }
 
     @Override
-    SearchResponse search(String query, String index, String type) {
+    SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
+        return null
+    }
+
+    @Override
+    SearchResponse scroll(String scroll) {
+        return null
+    }
+
+    @Override
+    String initialisePointInTime(String index, String keepAlive) {
+        return null
+    }
+
+    @Override
+    DeleteOperationResponse deletePointInTime(String pitId) {
+        return null
+    }
+
+    @Override
+    DeleteOperationResponse deleteScroll(String scrollId) {
         return null
     }