You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/01/04 21:43:39 UTC

[nifi] branch main updated: NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d9420afb60 NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch
d9420afb60 is described below

commit d9420afb600ffd9faf1707489ea8dc512fded2b5
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Sat Nov 19 20:51:24 2022 +0000

    NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch
    
    This closes #6687
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../ElasticSearchClientServiceImpl.java            |   2 +-
 .../integration/ElasticSearchClientService_IT.java | 101 +++++++++-------
 .../src/test/resources/setup-6.script              |   3 +-
 .../src/test/resources/setup-7.script              |   3 +-
 .../src/test/resources/setup-8.script              |   3 +-
 .../AbstractJsonQueryElasticsearch.java            | 128 +++++++++++++++------
 .../AbstractPaginatedJsonQueryElasticsearch.java   |  64 ++++-------
 .../elasticsearch/SearchElasticsearch.java         |   5 +-
 .../api/AggregationResultsFormat.java              |  47 ++++++++
 .../elasticsearch/api/PaginationType.java          |  56 +++++++++
 .../elasticsearch/api/ResultOutputStrategy.java    |  59 ++++++++++
 .../elasticsearch/api/SearchResultsFormat.java     |  47 ++++++++
 .../AbstractJsonQueryElasticsearchTest.groovy      | 118 +++++++++++++++----
 ...tractPaginatedJsonQueryElasticsearchTest.groovy |  44 +++----
 .../PaginatedJsonQueryElasticsearchTest.groovy     |  12 +-
 .../elasticsearch/SearchElasticsearchTest.groovy   |  32 +++---
 .../org/apache/nifi/elasticsearch/MapBuilder.java  |   4 +-
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |   4 +-
 18 files changed, 544 insertions(+), 188 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index ca7c2d306f..8fd9163131 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -604,7 +604,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
             final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
             parseResponseWarningHeaders(response);
 
-            return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
+            return (Map<String, Object>) mapper.readValue(body, Map.class).getOrDefault("_source", Collections.emptyMap());
         } catch (final Exception ex) {
             throw new ElasticsearchException(ex);
         }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index cd1bd3ca40..d7f0d39874 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -55,6 +55,7 @@ import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -205,19 +206,29 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
 
     @Test
     void testBasicSearch() throws Exception {
+        assertBasicSearch(null);
+    }
+
+    @Test
+    void testBasicSearchRequestParameters() throws Exception {
+        assertBasicSearch(createParameters("preference", "_local"));
+    }
+
+    private void assertBasicSearch(final Map<String, String> requestParameters) throws JsonProcessingException {
         final Map<String, Object> temp = new MapBuilder()
-            .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
-                    "aggs", new MapBuilder()
-                            .of("term_counts", new MapBuilder()
-                                    .of("terms", new MapBuilder()
-                                            .of("field", "msg", "size", 5)
-                                            .build())
-                                    .build())
-                            .build())
+                .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
+                        "aggs", new MapBuilder()
+                                .of("term_counts", new MapBuilder()
+                                        .of("terms", new MapBuilder()
+                                                .of("field", "msg", "size", 5)
+                                                .build())
+                                        .build())
+                                .build())
                 .build();
         final String query = prettyJson(temp);
 
-        final SearchResponse response = service.search(query, INDEX, type, null);
+
+        final SearchResponse response = service.search(query, "messages", type, requestParameters);
         assertNotNull(response, "Response was null");
 
         assertEquals(15, response.getNumberOfHits(), "Wrong count");
@@ -226,9 +237,6 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         assertEquals(10, response.getHits().size(), "Wrong number of hits");
         assertNotNull(response.getAggregations(), "Aggregations are missing");
         assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong");
-        assertNull(response.getScrollId(), "Unexpected ScrollId");
-        assertNull(response.getSearchAfter(), "Unexpected Search_After");
-        assertNull(response.getPitId(), "Unexpected pitId");
 
         @SuppressWarnings("unchecked") final Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts");
         assertNotNull(termCounts, "Term counts was missing");
@@ -239,51 +247,51 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                         "four", 4, "five", 5)
                 .build();
 
-        buckets.forEach( aggRes -> {
+        buckets.forEach( (aggRes) -> {
             final String key = (String) aggRes.get("key");
             final Integer docCount = (Integer) aggRes.get("doc_count");
-            assertEquals(expected.get(key), docCount, "${key} did not match.");
+            assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
         });
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    void testBasicSearchRequestParameters() throws Exception {
+    void testSearchEmptySource() throws Exception {
         final Map<String, Object> temp = new MapBuilder()
-                .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
-                        "aggs", new MapBuilder()
-                        .of("term_counts", new MapBuilder()
-                                .of("terms", new MapBuilder()
-                                        .of("field", "msg", "size", 5)
-                                        .build())
-                                .build())
-                        .build())
+                .of("size", 2,
+                        "query", new MapBuilder().of("match_all", new HashMap<>()).build())
                 .build();
         final String query = prettyJson(temp);
 
 
-        final SearchResponse response = service.search(query, "messages", type, createParameters("preference", "_local"));
+        final SearchResponse response = service.search(query, "messages", type, createParameters("_source", "not_exists"));
         assertNotNull(response, "Response was null");
 
-        assertEquals(15, response.getNumberOfHits(), "Wrong count");
-        assertFalse(response.isTimedOut(), "Timed out");
         assertNotNull(response.getHits(), "Hits was null");
-        assertEquals(10, response.getHits().size(), "Wrong number of hits");
-        assertNotNull(response.getAggregations(), "Aggregations are missing");
-        assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong");
+        assertEquals(2, response.getHits().size(), "Wrong number of hits");
+        response.getHits().forEach(h -> {
+            assertInstanceOf(Map.class, h.get("_source"), "Source not a Map");
+            assertTrue(((Map<String, Object>)h.get("_source")).isEmpty(), "Source not empty");
+        });
+    }
 
-        @SuppressWarnings("unchecked") final Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts");
-        assertNotNull(termCounts, "Term counts was missing");
-        @SuppressWarnings("unchecked") final List<Map<String, Object>> buckets = (List<Map<String, Object>>) termCounts.get("buckets");
-        assertNotNull(buckets, "Buckets branch was empty");
-        final Map<String, Object> expected = new MapBuilder()
-                .of("one", 1, "two", 2, "three", 3,
-                        "four", 4, "five", 5)
+    @Test
+    void testSearchNoSource() throws Exception {
+        final Map<String, Object> temp = new MapBuilder()
+                .of("size", 1,
+                        "query", new MapBuilder().of("match_all", new HashMap<>()).build())
                 .build();
+        final String query = prettyJson(temp);
 
-        buckets.forEach( (aggRes) -> {
-            final String key = (String) aggRes.get("key");
-            final Integer docCount = (Integer) aggRes.get("doc_count");
-            assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
+
+        final SearchResponse response = service.search(query, "no_source", type, null);
+        assertNotNull(response, "Response was null");
+
+        assertNotNull(response.getHits(), "Hits was null");
+        assertEquals(1, response.getHits().size(), "Wrong number of hits");
+        response.getHits().forEach(h -> {
+            assertFalse(h.isEmpty(), "Hit was empty");
+            assertFalse(h.containsKey("_source"), "Hit contained _source");
         });
     }
 
@@ -590,6 +598,19 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         }
     }
 
+    @Test
+    void testGetEmptySource() {
+        final Map<String, Object> doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist"));
+        assertNotNull(doc, "Doc was null");
+        assertTrue(doc.isEmpty(), "Doc was not empty");
+    }
+    @Test
+    void testGetNoSource() {
+        final Map<String, Object> doc = service.get("no_source", type, "1", null);
+        assertNotNull(doc, "Doc was null");
+        assertTrue(doc.isEmpty(), "Doc was not empty");
+    }
+
     @Test
     void testGetNotFound() {
         final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
index c3522f5b87..510498ebab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
 PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
 PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
 PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+PUT:no_source/:{ "mappings":{"_doc":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}}
 
 #add document
 PUT:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-432
 PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
 PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
 PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
index d7cfcc9de1..36e04def1d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
 
 #add document
 POST:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
 POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
 POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
 POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
index d7cfcc9de1..36e04def1d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
 PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
 
 #add document
 POST:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
 POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
 POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
 POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index f154cf2790..a0d32779b0 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticsearchException;
@@ -32,7 +31,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat;
 import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 
@@ -41,11 +43,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
@@ -57,23 +61,22 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
             .description("Aggregations are routed to this relationship.")
             .build();
 
-    public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
-            "splitUp-yes",
-            "Per Hit",
-            "Flowfile per hit."
-    );
-    public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
-            "splitUp-no",
-            "Per Response",
-            "Flowfile per response."
-    );
-
     public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
             .name("el-rest-split-up-hits")
             .displayName("Search Results Split")
             .description("Output a flowfile containing all hits or one flowfile for each individual hit.")
-            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
-            .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+            .allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
+            .defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor SEARCH_RESULTS_FORMAT = new PropertyDescriptor.Builder()
+            .name("el-rest-format-hits")
+            .displayName("Search Results Format")
+            .description("Format of Hits output.")
+            .allowableValues(SearchResultsFormat.class)
+            .defaultValue(SearchResultsFormat.FULL.getValue())
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
@@ -81,8 +84,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
             .name("el-rest-split-up-aggregations")
             .displayName("Aggregation Results Split")
             .description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
-            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
-            .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+            .allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
+            .defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor AGGREGATION_RESULTS_FORMAT = new PropertyDescriptor.Builder()
+            .name("el-rest-format-aggregations")
+            .displayName("Aggregation Results Format")
+            .description("Format of Aggregation output.")
+            .allowableValues(AggregationResultsFormat.class)
+            .defaultValue(AggregationResultsFormat.FULL.getValue())
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
@@ -101,14 +114,12 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
 
-    String splitUpHits;
-    private String splitUpAggregations;
+    ResultOutputStrategy hitStrategy;
+    private SearchResultsFormat hitFormat;
+    private ResultOutputStrategy aggregationStrategy;
+    private AggregationResultsFormat aggregationFormat;
     private boolean outputNoHits;
 
-    boolean getOutputNoHits() {
-        return outputNoHits;
-    }
-
     final ObjectMapper mapper = new ObjectMapper();
 
     final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
@@ -128,7 +139,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         descriptors.add(TYPE);
         descriptors.add(CLIENT_SERVICE);
         descriptors.add(SEARCH_RESULTS_SPLIT);
+        descriptors.add(SEARCH_RESULTS_FORMAT);
         descriptors.add(AGGREGATION_RESULTS_SPLIT);
+        descriptors.add(AGGREGATION_RESULTS_FORMAT);
         descriptors.add(OUTPUT_NO_HITS);
 
         propertyDescriptors = Collections.unmodifiableList(descriptors);
@@ -160,12 +173,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         return false;
     }
 
+    boolean isOutputNoHits() {
+        return outputNoHits;
+    }
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
 
-        splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
-        splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
+        hitStrategy = ResultOutputStrategy.fromValue(context.getProperty(SEARCH_RESULTS_SPLIT).getValue());
+        hitFormat = SearchResultsFormat.valueOf(context.getProperty(SEARCH_RESULTS_FORMAT).getValue());
+        aggregationStrategy = ResultOutputStrategy.fromValue(context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue());
+        aggregationFormat = AggregationResultsFormat.valueOf(context.getProperty(AGGREGATION_RESULTS_FORMAT).getValue());
 
         outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
     }
@@ -203,7 +222,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
                 input = session.putAttribute(input, "elasticsearch.query.error", ese.getMessage());
                 session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
             }
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             getLogger().error("Could not query documents.", ex);
             if (input != null) {
                 input = session.putAttribute(input, "elasticsearch.query.error", ex.getMessage());
@@ -260,17 +279,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
                                     final FlowFile parent, final Map<String, String> attributes,
                                     final String transitUri, final StopWatch stopWatch) throws IOException {
         if (aggregations != null && !aggregations.isEmpty()) {
+            final Map<String, Object> formattedAggregations = formatAggregations(aggregations);
             final List<FlowFile> aggsFlowFiles = new ArrayList<>();
-            if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
+
+            if (aggregationStrategy == ResultOutputStrategy.PER_HIT) {
                 int aggCount = 0;
-                for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+                for (final Map.Entry<String, Object> agg : formattedAggregations.entrySet()) {
                     final FlowFile aggFlowFile = createChildFlowFile(session, parent);
                     final String aggJson = mapper.writeValueAsString(agg.getValue());
                     aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
                 }
             } else {
                 final FlowFile aggFlowFile = createChildFlowFile(session, parent);
-                final String json = mapper.writeValueAsString(aggregations);
+                final String json = mapper.writeValueAsString(formattedAggregations);
                 aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
             }
 
@@ -281,8 +302,29 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> formatAggregations(final Map<String, Object> aggregations) {
+        final Map<String, Object> formattedAggregations;
+
+        if (aggregationFormat == AggregationResultsFormat.METADATA_ONLY) {
+            formattedAggregations = new LinkedHashMap<>(aggregations);
+            formattedAggregations.forEach((k, v) -> ((Map<String, Object>)v).remove("buckets"));
+        } else if (aggregationFormat == AggregationResultsFormat.BUCKETS_ONLY) {
+            formattedAggregations = aggregations.entrySet().stream().collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> ((Map<String, Object>)e.getValue()).get("buckets"),
+                    (k1, k2) -> k1,
+                    LinkedHashMap::new
+            ));
+        } else {
+            formattedAggregations = aggregations;
+        }
+
+        return formattedAggregations;
+    }
+
     FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
-                              final FlowFile hitFlowFile, final Map<String, String> attributes) {
+                                      final FlowFile hitFlowFile, final Map<String, String> attributes) {
         final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
         attributes.put("hit.count", Integer.toString(count));
 
@@ -301,16 +343,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
                               final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
                               final String transitUri, final StopWatch stopWatch) throws IOException {
         if (hits != null && !hits.isEmpty()) {
-            if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
-                for (final Map<String, Object> hit : hits) {
+            final List<Map<String, Object>> formattedHits = formatHits(hits);
+
+            if (hitStrategy == ResultOutputStrategy.PER_HIT) {
+                for (final Map<String, Object> hit : formattedHits) {
                     final FlowFile hitFlowFile = createChildFlowFile(session, parent);
                     final String json = mapper.writeValueAsString(hit);
                     hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
                 }
             } else {
                 final FlowFile hitFlowFile = createChildFlowFile(session, parent);
-                final String json = mapper.writeValueAsString(hits);
-                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+                final String json = mapper.writeValueAsString(formattedHits);
+                hitsFlowFiles.add(writeHitFlowFile(formattedHits.size(), json, session, hitFlowFile, attributes));
             }
         } else if (newQuery && outputNoHits) {
             final FlowFile hitFlowFile = createChildFlowFile(session, parent);
@@ -322,6 +366,24 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
         return hitsFlowFiles;
     }
 
+    @SuppressWarnings("unchecked")
+    private List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) {
+        final List<Map<String, Object>> formattedHits;
+
+        if (hitFormat == SearchResultsFormat.METADATA_ONLY) {
+            formattedHits = hits.stream().map(HashMap::new).collect(Collectors.toList());
+            formattedHits.forEach(h -> h.remove("_source"));
+        } else if (hitFormat == SearchResultsFormat.SOURCE_ONLY) {
+            formattedHits = hits.stream()
+                    .map(h -> (Map<String, Object>) h.getOrDefault("_source", Collections.emptyMap()))
+                    .collect(Collectors.toList());
+        } else {
+            formattedHits = hits;
+        }
+
+        return formattedHits;
+    }
+
     private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
                                          final StopWatch stopWatch) {
         // output any results
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index ec1a020ad7..42ef9416fb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.elasticsearch.SearchResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -30,6 +29,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 
@@ -43,45 +44,20 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
-    public static final AllowableValue FLOWFILE_PER_QUERY = new AllowableValue(
-            "splitUp-query",
-            "Per Query",
-            "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
-                    "Note that aggregations cannot be paged, they are generated across the entire result set and " +
-                    "returned as part of the first page. Results are output with one JSON object per line " +
-                    "(allowing hits to be combined from multiple pages without loading all results into memory)."
-    );
-
     public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
             .fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
             .description("Output a flowfile containing all hits or one flowfile for each individual hit " +
                     "or one flowfile containing all hits from all paged responses.")
-            .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT, FLOWFILE_PER_QUERY)
+            .allowableValues(ResultOutputStrategy.class)
             .build();
 
-    public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
-            "pagination-search_after",
-            "Search After",
-            "Use Elasticsearch \"search_after\" to page sorted results."
-    );
-    public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
-            "pagination-pit",
-            "Point in Time",
-            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
-    );
-    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
-            "pagination-scroll",
-            "Scroll",
-            "Use Elasticsearch \"scroll\" to page results."
-    );
-
     public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
             .name("el-rest-pagination-type")
             .displayName("Pagination Type")
             .description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
                     "check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
-            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
-            .defaultValue(PAGINATION_SCROLL.getValue())
+            .allowableValues(PaginationType.class)
+            .defaultValue(PaginationType.SCROLL.getValue())
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
@@ -106,7 +82,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
         descriptors.add(TYPE);
         descriptors.add(CLIENT_SERVICE);
         descriptors.add(SEARCH_RESULTS_SPLIT);
+        descriptors.add(SEARCH_RESULTS_FORMAT);
         descriptors.add(AGGREGATION_RESULTS_SPLIT);
+        descriptors.add(AGGREGATION_RESULTS_FORMAT);
         descriptors.add(PAGINATION_TYPE);
         descriptors.add(PAGINATION_KEEP_ALIVE);
         descriptors.add(OUTPUT_NO_HITS);
@@ -117,14 +95,14 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
     // output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
     private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
 
-    String paginationType;
+    PaginationType paginationType;
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
 
-        paginationType = context.getProperty(PAGINATION_TYPE).getValue();
+        paginationType = PaginationType.fromValue(context.getProperty(PAGINATION_TYPE).getValue());
     }
 
     @Override
@@ -139,18 +117,18 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
             // execute query/scroll
             final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters);
-            if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) {
+            if (!newQuery && paginationType == PaginationType.SCROLL) {
                 response = clientService.get().scroll(queryJson);
             } else {
                 final Map<String, String> requestParameters = getUrlQueryParameters(context, input);
-                if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+                if (paginationType == PaginationType.SCROLL) {
                     requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
                 }
 
                 response = clientService.get().search(
                         queryJson,
                         // Point in Time uses general /_search API not /index/_search
-                        PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(),
+                        paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(),
                         paginatedJsonQueryParameters.getType(),
                         requestParameters
                 );
@@ -170,7 +148,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
             updatePageExpirationTimestamp(paginatedJsonQueryParameters, !response.getHits().isEmpty());
 
             hitsFlowFiles = handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
-        } while (!response.getHits().isEmpty() && (input != null || FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)));
+        } while (!response.getHits().isEmpty() && (input != null || hitStrategy == ResultOutputStrategy.PER_QUERY));
 
         if (response.getHits().isEmpty()) {
             getLogger().debug("No more results for paginated query, clearing Elasticsearch resources");
@@ -199,7 +177,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
     private void prepareNextPageQuery(final ObjectNode queryJson, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
         // prepare to get next page of results (depending on pagination type)
-        if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+        if (paginationType == PaginationType.SCROLL) {
             // overwrite query JSON with existing Scroll details
             queryJson.removeAll().put("scroll_id", paginatedJsonQueryParameters.getScrollId());
             if (StringUtils.isNotBlank(paginatedJsonQueryParameters.getKeepAlive())) {
@@ -222,13 +200,13 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
         if (!newQuery) {
             prepareNextPageQuery(queryJson, paginatedJsonQueryParameters);
-        } else if ((PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) || PAGINATION_SEARCH_AFTER.getValue().equals(paginationType))
+        } else if ((paginationType == PaginationType.POINT_IN_TIME || paginationType == PaginationType.SEARCH_AFTER)
                 && !queryJson.has("sort")) {
             // verify query contains a "sort" field if pit/search_after requested
             throw new IllegalArgumentException("Query using pit/search_after must contain a \"sort\" field");
         }
 
-        if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+        if (paginationType == PaginationType.POINT_IN_TIME) {
             // add pit_id to query JSON
             final String queryPitId = newQuery
                     ? clientService.get().initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive())
@@ -273,7 +251,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
             hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
                     hits, session, hitFlowFile, attributes, append));
-        } else if (getOutputNoHits()) {
+        } else if (isOutputNoHits()) {
             final FlowFile hitFlowFile = createChildFlowFile(session, parent);
             hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
         }
@@ -292,7 +270,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
         paginatedJsonQueryParameters.incrementPageCount();
         attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
 
-        if (FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)) {
+        if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
             combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
 
             // output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected)
@@ -317,20 +295,20 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
     void clearElasticsearchState(final ProcessContext context, final SearchResponse response) {
         try {
-            if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+            if (paginationType == PaginationType.SCROLL) {
                 final String scrollId = getScrollId(context, response);
 
                 if (StringUtils.isNotBlank(scrollId)) {
                     clientService.get().deleteScroll(scrollId);
                 }
-            } else if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+            } else if (paginationType == PaginationType.POINT_IN_TIME) {
                 final String pitId = getPitId(context, response);
 
                 if (StringUtils.isNotBlank(pitId)) {
                     clientService.get().deletePointInTime(pitId);
                 }
             }
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             getLogger().warn("Error while cleaning up Elasticsearch pagination resources, ignoring", ex);
         }
     }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 9636526c7a..a7d39a1576 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
@@ -144,12 +145,12 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
             getLogger().debug("Updating local state for next execution");
 
             final Map<String, String> newStateMap = new HashMap<>();
-            if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+            if (paginationType == PaginationType.SCROLL) {
                 newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
             } else {
                 newStateMap.put(STATE_SEARCH_AFTER, response.getSearchAfter());
 
-                if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+                if (paginationType == PaginationType.POINT_IN_TIME) {
                     newStateMap.put(STATE_PIT_ID, response.getPitId());
                 }
             }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java
new file mode 100644
index 0000000000..71df500521
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AggregationResultsFormat implements DescribedValue {
+    FULL("Contains full Elasticsearch Aggregation, including Buckets and Metadata."),
+    BUCKETS_ONLY("Bucket Content only."),
+    METADATA_ONLY("Aggregation Metadata only.");
+
+    private final String description;
+
+    AggregationResultsFormat(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
new file mode 100644
index 0000000000..1e33153cb5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.Arrays;
+
+public enum PaginationType implements DescribedValue {
+    SCROLL("pagination-scroll", "Use Elasticsearch \"scroll\" to page results."),
+    SEARCH_AFTER("pagination-search_after", "Use Elasticsearch \"search_after\" to page sorted results."),
+    POINT_IN_TIME("pagination-pit", "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results.");
+
+    private final String value;
+    private final String description;
+
+    PaginationType(final String value, final String description) {
+        this.value = value;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static PaginationType fromValue(final String value) {
+        return Arrays.stream(PaginationType.values()).filter(v -> v.getValue().equals(value)).findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java
new file mode 100644
index 0000000000..01c84cbdc1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.Arrays;
+
+public enum ResultOutputStrategy implements DescribedValue {
+    PER_HIT("splitUp-yes", "Flowfile per hit."),
+    PER_RESPONSE("splitUp-no", "Flowfile per response."),
+    PER_QUERY("splitUp-query", "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
+            "Note that aggregations cannot be paged, they are generated across the entire result set and " +
+            "returned as part of the first page. Results are output with one JSON object per line " +
+            "(allowing hits to be combined from multiple pages without loading all results into memory).");
+
+    private final String value;
+    private final String description;
+
+    ResultOutputStrategy(final String value, final String description) {
+        this.value = value;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static ResultOutputStrategy fromValue(final String value) {
+        return Arrays.stream(ResultOutputStrategy.values()).filter(v -> v.getValue().equals(value)).findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java
new file mode 100644
index 0000000000..f41a101faa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SearchResultsFormat implements DescribedValue {
+    FULL("Contains full Elasticsearch Hit, including Document Source and Metadata."),
+    SOURCE_ONLY("Document Source only (where present)."),
+    METADATA_ONLY("Hit Metadata only.");
+
+    private final String description;
+
+    SearchResultsFormat(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index a914b8e433..25707cd300 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -17,8 +17,12 @@
 
 package org.apache.nifi.processors.elasticsearch
 
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.nifi.components.state.Scope
 import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
 import org.apache.nifi.provenance.ProvenanceEventType
 import org.apache.nifi.util.MockFlowFile
 import org.apache.nifi.util.TestRunner
@@ -31,10 +35,15 @@ import static org.hamcrest.CoreMatchers.equalTo
 import static org.hamcrest.CoreMatchers.is
 import static org.hamcrest.MatcherAssert.assertThat
 import static org.junit.jupiter.api.Assertions.assertEquals
+import static org.junit.jupiter.api.Assertions.assertFalse
+import static org.junit.jupiter.api.Assertions.assertInstanceOf
 import static org.junit.jupiter.api.Assertions.assertNotNull
 import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
 
 abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+
     static final String INDEX_NAME = "messages"
 
     abstract P getProcessor()
@@ -87,8 +96,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean")
 
         final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
-            ? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
-            : [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
+            ? ResultOutputStrategy.values().collect {r -> r.getValue()}.join(", ")
+            : [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", ")
 
         final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
         assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
@@ -106,7 +115,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
                 AbstractJsonQueryElasticsearch.TYPE.getName(), AbstractJsonQueryElasticsearch.TYPE.getName(),
                 AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
                 AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
-                AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
+                AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", "),
                 AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(),
                 AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
         )))
@@ -114,14 +123,22 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
 
     @Test
     void testBasicQuery() throws Exception {
-        // test hits (no splitting)
+        // test hits (no splitting) - full hit format
         final TestRunner runner = createRunner(false)
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
         final FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
         hits.assertAttributeEquals("hit.count", "10")
         assertOutputContent(hits.getContent(), 10, false)
+        final List<Map<String, Object>> result = OBJECT_MAPPER.readValue(hits.getContent(), List.class)
+        result.forEach({ hit ->
+            final Map<String, Object> h = ((Map<String, Object>)hit)
+            assertFalse(h.isEmpty())
+            assertTrue(h.containsKey("_source"))
+            assertTrue(h.containsKey("_index"))
+        })
         assertThat(
                 runner.getProvenanceEvents().stream().filter({ pe ->
                     pe.getEventType() == ProvenanceEventType.RECEIVE &&
@@ -132,14 +149,46 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         reset(runner)
 
 
-        // test splitting hits
-        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        // test splitting hits - _source only format
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+        runOnce(runner)
+        testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
+        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+            hit.assertAttributeEquals("hit.count", "1")
+            assertOutputContent(hit.getContent(), 1, false)
+            final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+            assertFalse(h.isEmpty())
+            assertFalse(h.containsKey("_source"))
+            assertFalse(h.containsKey("_index"))
+            // should be the _source content only
+            assertTrue(h.containsKey("msg"))
+
+            assertThat(
+                    runner.getProvenanceEvents().stream().filter({ pe ->
+                        pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                pe.getAttribute("uuid") == hit.getAttribute("uuid")
+                    }).count(),
+                    is(1L)
+            )
+        })
+        reset(runner)
+
+
+        // test splitting hits - metadata only format
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.METADATA_ONLY.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
                 { hit ->
                     hit.assertAttributeEquals("hit.count", "1")
                     assertOutputContent(hit.getContent(), 1, false)
+                    final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                    assertFalse(h.isEmpty())
+                    assertFalse(h.containsKey("_source"))
+                    assertTrue(h.containsKey("_index"))
+
                     assertThat(
                             runner.getProvenanceEvents().stream().filter({ pe ->
                                 pe.getEventType() == ProvenanceEventType.RECEIVE &&
@@ -197,9 +246,10 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
                 aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
         ]))
 
-        // test aggregations (no splitting)
+        // test aggregations (no splitting) - full aggregation format
         final TestRunner runner = createRunner(true)
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 1)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
@@ -208,30 +258,57 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         aggregations.assertAttributeNotExists("aggregation.name")
         // count == 1 because aggregations is a single Map rather than a List of Maps, even when there are multiple aggs
         assertOutputContent(aggregations.getContent(), 1, false)
+        Map<String, Object> agg = OBJECT_MAPPER.readValue(aggregations.getContent(), Map.class)
+        // agg Map of 2 Maps (buckets and metadata)
+        assertThat(agg.size(), is(2))
+        agg.keySet().forEach({ aggName ->
+            final Map<String, Object> termAgg = agg.get(aggName) as Map<String, Object>
+            assertInstanceOf(List.class, termAgg.get("buckets"))
+            assertTrue(termAgg.containsKey("doc_count_error_upper_bound"))
+        })
         reset(runner)
 
 
-        // test with the query parameter and no incoming connection
+        // test with the query parameter and no incoming connection - buckets only aggregation format
         runner.setIncomingConnection(false)
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.BUCKETS_ONLY.getValue())
         runner.run(1, true, true)
         testCounts(runner, 0, 1, 0, 1)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.number")
-        runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.name")
+        final MockFlowFile singleAgg = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0)
+        singleAgg.assertAttributeNotExists("aggregation.number")
+        singleAgg.assertAttributeNotExists("aggregation.name")
+        agg = OBJECT_MAPPER.readValue(singleAgg.getContent(), Map.class)
+        // agg Map of 2 Lists (bucket contents only)
+        assertThat(agg.size(), is(2))
+        agg.keySet().forEach({ aggName ->
+            final List<Map<String, Object>> termAgg = agg.get(aggName) as List<Map<String, Object>>
+            assertThat(termAgg.size(), is(5))
+            termAgg.forEach({a ->
+                assertTrue(a.containsKey("key"))
+                assertTrue(a.containsKey("doc_count"))
+            })
+        })
         reset(runner)
 
 
-        // test splitting aggregations
-        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        // test splitting aggregations - metadata only aggregation format
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.METADATA_ONLY.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
         int a = 0
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
-                { agg ->
-                    agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
-                    agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
-                    assertOutputContent(agg.getContent(), 1, false)
+                { termAgg ->
+                    termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+                    termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+                    assertOutputContent(termAgg.getContent(), 1, false)
+
+                    Map<String, Object> aggContent = OBJECT_MAPPER.readValue(termAgg.getContent(), Map.class)
+                    // agg Map (metadata, no buckets)
+                    assertTrue(aggContent.containsKey("doc_count_error_upper_bound"))
+                    assertFalse(aggContent.containsKey("buckets"))
                 }
         )
         reset(runner)
@@ -248,15 +325,16 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
         runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "\${es.index}")
         runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "\${es.type}")
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
         a = 0
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
-                { agg ->
-                    agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
-                    agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
-                    assertOutputContent(agg.getContent(), 1, false)
+                { termAgg ->
+                    termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+                    termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+                    assertOutputContent(termAgg.getContent(), 1, false)
                 }
         )
     }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 0ec4470f76..4db5bccb79 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -17,8 +17,11 @@
 
 package org.apache.nifi.processors.elasticsearch
 
-import org.apache.nifi.components.AllowableValue
 import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
 import org.apache.nifi.provenance.ProvenanceEventType
 import org.apache.nifi.util.MockFlowFile
 import org.apache.nifi.util.TestRunner
@@ -46,12 +49,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                 "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
                 "'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> " +
                 "is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
-                AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
-                [
-                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL,
-                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER,
-                        AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME
-                ].join(", "),
+                AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), PaginationType.values().collect {p -> p.getValue()}.join(", "),
                 AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
                 AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
         )))
@@ -80,7 +78,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
 
 
         // paged query hits splitting
-        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
         input = runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
@@ -102,7 +100,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
 
 
         // paged query hits combined
-        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
         input = runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
         hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
@@ -124,7 +122,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setThrowErrorInDelete(true)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
 
         // still expect "success" output for exception during final clean-up
@@ -146,7 +144,9 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setThrowErrorInDelete(true)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
+        runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
 
         // still expect "success" output for exception during final clean-up
@@ -168,7 +168,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setThrowErrorInPit(true)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
 
         // expect "failure" output for exception during query setup
@@ -189,7 +189,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
     void testQuerySortError() {
         // test PiT without sort
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
 
         // expect "failure" output for exception during query setup
@@ -208,7 +208,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
 
 
         // test search_after without sort
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SEARCH_AFTER.getValue())
         runOnce(runner)
         testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
         assertThat(runner.getLogger().getErrorMessages().stream()
@@ -222,27 +222,27 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
 
 
         // test scroll without sort (should succeed)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
         runMultiple(runner, 2)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
     }
 
     @Test
     void testScroll() {
-        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        testPagination(PaginationType.SCROLL)
     }
 
     @Test
     void testPit() {
-        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        testPagination(PaginationType.POINT_IN_TIME)
     }
 
     @Test
     void testSearchAfter() {
-        testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+        testPagination(PaginationType.SEARCH_AFTER)
     }
 
-    abstract void testPagination(final AllowableValue paginationType)
+    abstract void testPagination(final PaginationType paginationType)
 
     private void runMultiple(final TestRunner runner, final int maxIterations) {
         if (isInput()) {
@@ -278,7 +278,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         service.setMaxPages(0)
 
         // test that an empty flow file is produced for a per query setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
@@ -288,7 +288,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         reset(runner)
 
         // test that an empty flow file is produced for a per hit setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
@@ -298,7 +298,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         reset(runner)
 
         // test that an empty flow file is produced for a per response setup
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_RESPONSE.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
index 3b2d4fe323..992b548b94 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.processors.elasticsearch
 
-import org.apache.nifi.components.AllowableValue
+
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
 import org.apache.nifi.util.TestRunner
 
 import static groovy.json.JsonOutput.prettyPrint
@@ -38,12 +40,12 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
         return true
     }
 
-    void testPagination(final AllowableValue paginationType) {
+    void testPagination(final PaginationType paginationType) {
         // test flowfile per page
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(2)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
 
         runOnce(runner)
@@ -60,7 +62,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
 
 
         // test hits splitting
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
         runOnce(runner)
         testCounts(runner, 1, 20, 0, 0)
         int count = 0
@@ -76,7 +78,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
 
 
         // test hits combined
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
         runOnce(runner)
         testCounts(runner, 1, 1, 0, 0)
         runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
index ea60ce8b93..e3bbbff42e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
@@ -17,8 +17,10 @@
 
 package org.apache.nifi.processors.elasticsearch
 
-import org.apache.nifi.components.AllowableValue
+
 import org.apache.nifi.components.state.Scope
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
 import org.apache.nifi.state.MockStateManager
 import org.apache.nifi.util.TestRunner
 import org.junit.Test
@@ -50,7 +52,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(2)
         service.setThrowErrorInSearch(false)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
 
         // initialise search
@@ -73,25 +75,25 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
 
     @Test
     void testScrollExpiration() {
-        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+        testPaginationExpiration(PaginationType.SCROLL)
     }
 
     @Test
     void testPitExpiration() {
-        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+        testPaginationExpiration(PaginationType.POINT_IN_TIME)
     }
 
     @Test
     void testSearchAfterExpiration() {
-        testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+        testPaginationExpiration(PaginationType.SEARCH_AFTER)
     }
 
-    private void testPaginationExpiration(final AllowableValue paginationType) {
+    private void testPaginationExpiration(final PaginationType paginationType) {
         // test flowfile per page
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(2)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
         runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "1 sec")
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
 
@@ -131,12 +133,12 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
         runner.clearTransferState()
     }
 
-    void testPagination(final AllowableValue paginationType) {
+    void testPagination(final PaginationType paginationType) {
         // test flowfile per page
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(2)
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
 
         // first page
@@ -163,7 +165,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
 
 
         // test hits splitting
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
 
         // first page
         runOnce(runner)
@@ -197,7 +199,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
 
 
         // test hits combined
-        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+        runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
         // hits are combined from all pages within a single trigger of the processor
         runOnce(runner)
         testCounts(runner, 0, 1, 0, 0)
@@ -211,7 +213,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
         assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
     }
 
-    private static void assertState(final MockStateManager stateManager, final AllowableValue paginationType,
+    private static void assertState(final MockStateManager stateManager, final PaginationType paginationType,
                                     final int hitCount, final int pageCount) {
         stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT, Integer.toString(hitCount), Scope.LOCAL)
         stateManager.assertStateEquals(SearchElasticsearch.STATE_PAGE_COUNT, Integer.toString(pageCount), Scope.LOCAL)
@@ -220,17 +222,17 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
         assertThat(Long.parseLong(pageExpirationTimestamp) > Instant.now().toEpochMilli(), is(true))
 
         switch (paginationType) {
-            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL:
+            case PaginationType.SCROLL:
                 stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-${pageCount}", Scope.LOCAL)
                 stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
                 stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, Scope.LOCAL)
                 break
-            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME:
+            case PaginationType.POINT_IN_TIME:
                 stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
                 stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-${pageCount}", Scope.LOCAL)
                 stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
                 break
-            case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER:
+            case PaginationType.SEARCH_AFTER:
                 stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
                 stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
                 stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
index 7aea00b772..80f80a1ba3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
@@ -17,14 +17,14 @@
 package org.apache.nifi.elasticsearch;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class MapBuilder {
     private final Map<String, Object> toBuild;
 
     public MapBuilder() {
-        toBuild = new HashMap<>();
+        toBuild = new LinkedHashMap<>();
     }
 
     public MapBuilder of(final String key, final Object value) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index da145818c5..1c0cd7b10b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -87,7 +87,7 @@ language governing permissions and limitations under the License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.5.0</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
                 <elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>
@@ -118,7 +118,7 @@ language governing permissions and limitations under the License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                <elasticsearch_docker_image>7.17.7</elasticsearch_docker_image>
+                <elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>