You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/01/04 21:43:39 UTC
[nifi] branch main updated: NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d9420afb60 NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch
d9420afb60 is described below
commit d9420afb600ffd9faf1707489ea8dc512fded2b5
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Sat Nov 19 20:51:24 2022 +0000
NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch
This closes #6687
Signed-off-by: David Handermann <ex...@apache.org>
---
.../ElasticSearchClientServiceImpl.java | 2 +-
.../integration/ElasticSearchClientService_IT.java | 101 +++++++++-------
.../src/test/resources/setup-6.script | 3 +-
.../src/test/resources/setup-7.script | 3 +-
.../src/test/resources/setup-8.script | 3 +-
.../AbstractJsonQueryElasticsearch.java | 128 +++++++++++++++------
.../AbstractPaginatedJsonQueryElasticsearch.java | 64 ++++-------
.../elasticsearch/SearchElasticsearch.java | 5 +-
.../api/AggregationResultsFormat.java | 47 ++++++++
.../elasticsearch/api/PaginationType.java | 56 +++++++++
.../elasticsearch/api/ResultOutputStrategy.java | 59 ++++++++++
.../elasticsearch/api/SearchResultsFormat.java | 47 ++++++++
.../AbstractJsonQueryElasticsearchTest.groovy | 118 +++++++++++++++----
...tractPaginatedJsonQueryElasticsearchTest.groovy | 44 +++----
.../PaginatedJsonQueryElasticsearchTest.groovy | 12 +-
.../elasticsearch/SearchElasticsearchTest.groovy | 32 +++---
.../org/apache/nifi/elasticsearch/MapBuilder.java | 4 +-
nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml | 4 +-
18 files changed, 544 insertions(+), 188 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index ca7c2d306f..8fd9163131 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -604,7 +604,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
parseResponseWarningHeaders(response);
- return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
+ return (Map<String, Object>) mapper.readValue(body, Map.class).getOrDefault("_source", Collections.emptyMap());
} catch (final Exception ex) {
throw new ElasticsearchException(ex);
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index cd1bd3ca40..d7f0d39874 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -55,6 +55,7 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -205,19 +206,29 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
@Test
void testBasicSearch() throws Exception {
+ assertBasicSearch(null);
+ }
+
+ @Test
+ void testBasicSearchRequestParameters() throws Exception {
+ assertBasicSearch(createParameters("preference", "_local"));
+ }
+
+ private void assertBasicSearch(final Map<String, String> requestParameters) throws JsonProcessingException {
final Map<String, Object> temp = new MapBuilder()
- .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
- "aggs", new MapBuilder()
- .of("term_counts", new MapBuilder()
- .of("terms", new MapBuilder()
- .of("field", "msg", "size", 5)
- .build())
- .build())
- .build())
+ .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
+ "aggs", new MapBuilder()
+ .of("term_counts", new MapBuilder()
+ .of("terms", new MapBuilder()
+ .of("field", "msg", "size", 5)
+ .build())
+ .build())
+ .build())
.build();
final String query = prettyJson(temp);
- final SearchResponse response = service.search(query, INDEX, type, null);
+
+ final SearchResponse response = service.search(query, "messages", type, requestParameters);
assertNotNull(response, "Response was null");
assertEquals(15, response.getNumberOfHits(), "Wrong count");
@@ -226,9 +237,6 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
assertEquals(10, response.getHits().size(), "Wrong number of hits");
assertNotNull(response.getAggregations(), "Aggregations are missing");
assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong");
- assertNull(response.getScrollId(), "Unexpected ScrollId");
- assertNull(response.getSearchAfter(), "Unexpected Search_After");
- assertNull(response.getPitId(), "Unexpected pitId");
@SuppressWarnings("unchecked") final Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts");
assertNotNull(termCounts, "Term counts was missing");
@@ -239,51 +247,51 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
"four", 4, "five", 5)
.build();
- buckets.forEach( aggRes -> {
+ buckets.forEach( (aggRes) -> {
final String key = (String) aggRes.get("key");
final Integer docCount = (Integer) aggRes.get("doc_count");
- assertEquals(expected.get(key), docCount, "${key} did not match.");
+ assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
});
}
+ @SuppressWarnings("unchecked")
@Test
- void testBasicSearchRequestParameters() throws Exception {
+ void testSearchEmptySource() throws Exception {
final Map<String, Object> temp = new MapBuilder()
- .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
- "aggs", new MapBuilder()
- .of("term_counts", new MapBuilder()
- .of("terms", new MapBuilder()
- .of("field", "msg", "size", 5)
- .build())
- .build())
- .build())
+ .of("size", 2,
+ "query", new MapBuilder().of("match_all", new HashMap<>()).build())
.build();
final String query = prettyJson(temp);
- final SearchResponse response = service.search(query, "messages", type, createParameters("preference", "_local"));
+ final SearchResponse response = service.search(query, "messages", type, createParameters("_source", "not_exists"));
assertNotNull(response, "Response was null");
- assertEquals(15, response.getNumberOfHits(), "Wrong count");
- assertFalse(response.isTimedOut(), "Timed out");
assertNotNull(response.getHits(), "Hits was null");
- assertEquals(10, response.getHits().size(), "Wrong number of hits");
- assertNotNull(response.getAggregations(), "Aggregations are missing");
- assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong");
+ assertEquals(2, response.getHits().size(), "Wrong number of hits");
+ response.getHits().forEach(h -> {
+ assertInstanceOf(Map.class, h.get("_source"), "Source not a Map");
+ assertTrue(((Map<String, Object>)h.get("_source")).isEmpty(), "Source not empty");
+ });
+ }
- @SuppressWarnings("unchecked") final Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts");
- assertNotNull(termCounts, "Term counts was missing");
- @SuppressWarnings("unchecked") final List<Map<String, Object>> buckets = (List<Map<String, Object>>) termCounts.get("buckets");
- assertNotNull(buckets, "Buckets branch was empty");
- final Map<String, Object> expected = new MapBuilder()
- .of("one", 1, "two", 2, "three", 3,
- "four", 4, "five", 5)
+ @Test
+ void testSearchNoSource() throws Exception {
+ final Map<String, Object> temp = new MapBuilder()
+ .of("size", 1,
+ "query", new MapBuilder().of("match_all", new HashMap<>()).build())
.build();
+ final String query = prettyJson(temp);
- buckets.forEach( (aggRes) -> {
- final String key = (String) aggRes.get("key");
- final Integer docCount = (Integer) aggRes.get("doc_count");
- assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
+
+ final SearchResponse response = service.search(query, "no_source", type, null);
+ assertNotNull(response, "Response was null");
+
+ assertNotNull(response.getHits(), "Hits was null");
+ assertEquals(1, response.getHits().size(), "Wrong number of hits");
+ response.getHits().forEach(h -> {
+ assertFalse(h.isEmpty(), "Hit was empty");
+ assertFalse(h.containsKey("_source"), "Hit contained _source");
});
}
@@ -590,6 +598,19 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
}
}
+ @Test
+ void testGetEmptySource() {
+ final Map<String, Object> doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist"));
+ assertNotNull(doc, "Doc was null");
+ assertTrue(doc.isEmpty(), "Doc was not empty");
+ }
+ @Test
+ void testGetNoSource() {
+ final Map<String, Object> doc = service.get("no_source", type, "1", null);
+ assertNotNull(doc, "Doc was null");
+ assertTrue(doc.isEmpty(), "Doc was not empty");
+ }
+
@Test
void testGetNotFound() {
final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
index c3522f5b87..510498ebab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+PUT:no_source/:{ "mappings":{"_doc":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}}
#add document
PUT:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-432
PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
index d7cfcc9de1..36e04def1d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
index d7cfcc9de1..36e04def1d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-8.script
@@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
@@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
-
+PUT:no_source/_doc/1:{ "msg":"none" }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index f154cf2790..a0d32779b0 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
@@ -32,7 +31,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@@ -41,11 +43,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
@@ -57,23 +61,22 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
.description("Aggregations are routed to this relationship.")
.build();
- public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
- "splitUp-yes",
- "Per Hit",
- "Flowfile per hit."
- );
- public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
- "splitUp-no",
- "Per Response",
- "Flowfile per response."
- );
-
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.name("el-rest-split-up-hits")
.displayName("Search Results Split")
.description("Output a flowfile containing all hits or one flowfile for each individual hit.")
- .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
- .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+ .allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
+ .defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor SEARCH_RESULTS_FORMAT = new PropertyDescriptor.Builder()
+ .name("el-rest-format-hits")
+ .displayName("Search Results Format")
+ .description("Format of Hits output.")
+ .allowableValues(SearchResultsFormat.class)
+ .defaultValue(SearchResultsFormat.FULL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@@ -81,8 +84,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
.name("el-rest-split-up-aggregations")
.displayName("Aggregation Results Split")
.description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
- .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
- .defaultValue(FLOWFILE_PER_RESPONSE.getValue())
+ .allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
+ .defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor AGGREGATION_RESULTS_FORMAT = new PropertyDescriptor.Builder()
+ .name("el-rest-format-aggregations")
+ .displayName("Aggregation Results Format")
+ .description("Format of Aggregation output.")
+ .allowableValues(AggregationResultsFormat.class)
+ .defaultValue(AggregationResultsFormat.FULL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@@ -101,14 +114,12 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
- String splitUpHits;
- private String splitUpAggregations;
+ ResultOutputStrategy hitStrategy;
+ private SearchResultsFormat hitFormat;
+ private ResultOutputStrategy aggregationStrategy;
+ private AggregationResultsFormat aggregationFormat;
private boolean outputNoHits;
- boolean getOutputNoHits() {
- return outputNoHits;
- }
-
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
@@ -128,7 +139,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
+ descriptors.add(SEARCH_RESULTS_FORMAT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
+ descriptors.add(AGGREGATION_RESULTS_FORMAT);
descriptors.add(OUTPUT_NO_HITS);
propertyDescriptors = Collections.unmodifiableList(descriptors);
@@ -160,12 +173,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
return false;
}
+ boolean isOutputNoHits() {
+ return outputNoHits;
+ }
+
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
- splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
- splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
+ hitStrategy = ResultOutputStrategy.fromValue(context.getProperty(SEARCH_RESULTS_SPLIT).getValue());
+ hitFormat = SearchResultsFormat.valueOf(context.getProperty(SEARCH_RESULTS_FORMAT).getValue());
+ aggregationStrategy = ResultOutputStrategy.fromValue(context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue());
+ aggregationFormat = AggregationResultsFormat.valueOf(context.getProperty(AGGREGATION_RESULTS_FORMAT).getValue());
outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
}
@@ -203,7 +222,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
input = session.putAttribute(input, "elasticsearch.query.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
getLogger().error("Could not query documents.", ex);
if (input != null) {
input = session.putAttribute(input, "elasticsearch.query.error", ex.getMessage());
@@ -260,17 +279,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final FlowFile parent, final Map<String, String> attributes,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (aggregations != null && !aggregations.isEmpty()) {
+ final Map<String, Object> formattedAggregations = formatAggregations(aggregations);
final List<FlowFile> aggsFlowFiles = new ArrayList<>();
- if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
+
+ if (aggregationStrategy == ResultOutputStrategy.PER_HIT) {
int aggCount = 0;
- for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
+ for (final Map.Entry<String, Object> agg : formattedAggregations.entrySet()) {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String aggJson = mapper.writeValueAsString(agg.getValue());
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
}
} else {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
- final String json = mapper.writeValueAsString(aggregations);
+ final String json = mapper.writeValueAsString(formattedAggregations);
aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
}
@@ -281,8 +302,29 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
}
}
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> formatAggregations(final Map<String, Object> aggregations) {
+ final Map<String, Object> formattedAggregations;
+
+ if (aggregationFormat == AggregationResultsFormat.METADATA_ONLY) {
+ formattedAggregations = new LinkedHashMap<>(aggregations);
+ formattedAggregations.forEach((k, v) -> ((Map<String, Object>)v).remove("buckets"));
+ } else if (aggregationFormat == AggregationResultsFormat.BUCKETS_ONLY) {
+ formattedAggregations = aggregations.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> ((Map<String, Object>)e.getValue()).get("buckets"),
+ (k1, k2) -> k1,
+ LinkedHashMap::new
+ ));
+ } else {
+ formattedAggregations = aggregations;
+ }
+
+ return formattedAggregations;
+ }
+
FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
- final FlowFile hitFlowFile, final Map<String, String> attributes) {
+ final FlowFile hitFlowFile, final Map<String, String> attributes) {
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
attributes.put("hit.count", Integer.toString(count));
@@ -301,16 +343,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (hits != null && !hits.isEmpty()) {
- if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
- for (final Map<String, Object> hit : hits) {
+ final List<Map<String, Object>> formattedHits = formatHits(hits);
+
+ if (hitStrategy == ResultOutputStrategy.PER_HIT) {
+ for (final Map<String, Object> hit : formattedHits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hit);
hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
}
} else {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
- final String json = mapper.writeValueAsString(hits);
- hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
+ final String json = mapper.writeValueAsString(formattedHits);
+ hitsFlowFiles.add(writeHitFlowFile(formattedHits.size(), json, session, hitFlowFile, attributes));
}
} else if (newQuery && outputNoHits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
@@ -322,6 +366,24 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
return hitsFlowFiles;
}
+ @SuppressWarnings("unchecked")
+ private List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) {
+ final List<Map<String, Object>> formattedHits;
+
+ if (hitFormat == SearchResultsFormat.METADATA_ONLY) {
+ formattedHits = hits.stream().map(HashMap::new).collect(Collectors.toList());
+ formattedHits.forEach(h -> h.remove("_source"));
+ } else if (hitFormat == SearchResultsFormat.SOURCE_ONLY) {
+ formattedHits = hits.stream()
+ .map(h -> (Map<String, Object>) h.getOrDefault("_source", Collections.emptyMap()))
+ .collect(Collectors.toList());
+ } else {
+ formattedHits = hits;
+ }
+
+ return formattedHits;
+ }
+
private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
final StopWatch stopWatch) {
// output any results
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index ec1a020ad7..42ef9416fb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -30,6 +29,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@@ -43,45 +44,20 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
- public static final AllowableValue FLOWFILE_PER_QUERY = new AllowableValue(
- "splitUp-query",
- "Per Query",
- "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
- "Note that aggregations cannot be paged, they are generated across the entire result set and " +
- "returned as part of the first page. Results are output with one JSON object per line " +
- "(allowing hits to be combined from multiple pages without loading all results into memory)."
- );
-
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
.description("Output a flowfile containing all hits or one flowfile for each individual hit " +
"or one flowfile containing all hits from all paged responses.")
- .allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT, FLOWFILE_PER_QUERY)
+ .allowableValues(ResultOutputStrategy.class)
.build();
- public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
- "pagination-search_after",
- "Search After",
- "Use Elasticsearch \"search_after\" to page sorted results."
- );
- public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
- "pagination-pit",
- "Point in Time",
- "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
- );
- public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
- "pagination-scroll",
- "Scroll",
- "Use Elasticsearch \"scroll\" to page results."
- );
-
public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
.name("el-rest-pagination-type")
.displayName("Pagination Type")
.description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
"check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
- .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
- .defaultValue(PAGINATION_SCROLL.getValue())
+ .allowableValues(PaginationType.class)
+ .defaultValue(PaginationType.SCROLL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@@ -106,7 +82,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
+ descriptors.add(SEARCH_RESULTS_FORMAT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
+ descriptors.add(AGGREGATION_RESULTS_FORMAT);
descriptors.add(PAGINATION_TYPE);
descriptors.add(PAGINATION_KEEP_ALIVE);
descriptors.add(OUTPUT_NO_HITS);
@@ -117,14 +95,14 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
// output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
- String paginationType;
+ PaginationType paginationType;
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
- paginationType = context.getProperty(PAGINATION_TYPE).getValue();
+ paginationType = PaginationType.fromValue(context.getProperty(PAGINATION_TYPE).getValue());
}
@Override
@@ -139,18 +117,18 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
// execute query/scroll
final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters);
- if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) {
+ if (!newQuery && paginationType == PaginationType.SCROLL) {
response = clientService.get().scroll(queryJson);
} else {
final Map<String, String> requestParameters = getUrlQueryParameters(context, input);
- if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.SCROLL) {
requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
}
response = clientService.get().search(
queryJson,
// Point in Time uses general /_search API not /index/_search
- PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(),
+ paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(),
paginatedJsonQueryParameters.getType(),
requestParameters
);
@@ -170,7 +148,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
updatePageExpirationTimestamp(paginatedJsonQueryParameters, !response.getHits().isEmpty());
hitsFlowFiles = handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
- } while (!response.getHits().isEmpty() && (input != null || FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)));
+ } while (!response.getHits().isEmpty() && (input != null || hitStrategy == ResultOutputStrategy.PER_QUERY));
if (response.getHits().isEmpty()) {
getLogger().debug("No more results for paginated query, clearing Elasticsearch resources");
@@ -199,7 +177,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
private void prepareNextPageQuery(final ObjectNode queryJson, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
// prepare to get next page of results (depending on pagination type)
- if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.SCROLL) {
// overwrite query JSON with existing Scroll details
queryJson.removeAll().put("scroll_id", paginatedJsonQueryParameters.getScrollId());
if (StringUtils.isNotBlank(paginatedJsonQueryParameters.getKeepAlive())) {
@@ -222,13 +200,13 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
if (!newQuery) {
prepareNextPageQuery(queryJson, paginatedJsonQueryParameters);
- } else if ((PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) || PAGINATION_SEARCH_AFTER.getValue().equals(paginationType))
+ } else if ((paginationType == PaginationType.POINT_IN_TIME || paginationType == PaginationType.SEARCH_AFTER)
&& !queryJson.has("sort")) {
// verify query contains a "sort" field if pit/search_after requested
throw new IllegalArgumentException("Query using pit/search_after must contain a \"sort\" field");
}
- if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.POINT_IN_TIME) {
// add pit_id to query JSON
final String queryPitId = newQuery
? clientService.get().initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive())
@@ -273,7 +251,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
hits, session, hitFlowFile, attributes, append));
- } else if (getOutputNoHits()) {
+ } else if (isOutputNoHits()) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
}
@@ -292,7 +270,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
paginatedJsonQueryParameters.incrementPageCount();
attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
- if (FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)) {
+ if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
// output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected)
@@ -317,20 +295,20 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
void clearElasticsearchState(final ProcessContext context, final SearchResponse response) {
try {
- if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.SCROLL) {
final String scrollId = getScrollId(context, response);
if (StringUtils.isNotBlank(scrollId)) {
clientService.get().deleteScroll(scrollId);
}
- } else if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+ } else if (paginationType == PaginationType.POINT_IN_TIME) {
final String pitId = getPitId(context, response);
if (StringUtils.isNotBlank(pitId)) {
clientService.get().deletePointInTime(pitId);
}
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
getLogger().warn("Error while cleaning up Elasticsearch pagination resources, ignoring", ex);
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 9636526c7a..a7d39a1576 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
@@ -144,12 +145,12 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
getLogger().debug("Updating local state for next execution");
final Map<String, String> newStateMap = new HashMap<>();
- if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.SCROLL) {
newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
} else {
newStateMap.put(STATE_SEARCH_AFTER, response.getSearchAfter());
- if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
+ if (paginationType == PaginationType.POINT_IN_TIME) {
newStateMap.put(STATE_PIT_ID, response.getPitId());
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java
new file mode 100644
index 0000000000..71df500521
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/AggregationResultsFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AggregationResultsFormat implements DescribedValue {
+ FULL("Contains full Elasticsearch Aggregation, including Buckets and Metadata."),
+ BUCKETS_ONLY("Bucket Content only."),
+ METADATA_ONLY("Aggregation Metadata only.");
+
+ private final String description;
+
+ AggregationResultsFormat(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
new file mode 100644
index 0000000000..1e33153cb5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.Arrays;
+
+public enum PaginationType implements DescribedValue {
+ SCROLL("pagination-scroll", "Use Elasticsearch \"scroll\" to page results."),
+ SEARCH_AFTER("pagination-search_after", "Use Elasticsearch \"search_after\" to page sorted results."),
+ POINT_IN_TIME("pagination-pit", "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results.");
+
+ private final String value;
+ private final String description;
+
+ PaginationType(final String value, final String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public static PaginationType fromValue(final String value) {
+ return Arrays.stream(PaginationType.values()).filter(v -> v.getValue().equals(value)).findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java
new file mode 100644
index 0000000000..01c84cbdc1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/ResultOutputStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.Arrays;
+
+public enum ResultOutputStrategy implements DescribedValue {
+ PER_HIT("splitUp-yes", "Flowfile per hit."),
+ PER_RESPONSE("splitUp-no", "Flowfile per response."),
+ PER_QUERY("splitUp-query", "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
+ "Note that aggregations cannot be paged, they are generated across the entire result set and " +
+ "returned as part of the first page. Results are output with one JSON object per line " +
+ "(allowing hits to be combined from multiple pages without loading all results into memory).");
+
+ private final String value;
+ private final String description;
+
+ ResultOutputStrategy(final String value, final String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public static ResultOutputStrategy fromValue(final String value) {
+ return Arrays.stream(ResultOutputStrategy.values()).filter(v -> v.getValue().equals(value)).findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java
new file mode 100644
index 0000000000..f41a101faa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/SearchResultsFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SearchResultsFormat implements DescribedValue {
+ FULL("Contains full Elasticsearch Hit, including Document Source and Metadata."),
+ SOURCE_ONLY("Document Source only (where present)."),
+ METADATA_ONLY("Hit Metadata only.");
+
+ private final String description;
+
+ SearchResultsFormat(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index a914b8e433..25707cd300 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -17,8 +17,12 @@
package org.apache.nifi.processors.elasticsearch
+import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.nifi.components.state.Scope
import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
@@ -31,10 +35,15 @@ import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
+import static org.junit.jupiter.api.Assertions.assertFalse
+import static org.junit.jupiter.api.Assertions.assertInstanceOf
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+
static final String INDEX_NAME = "messages"
abstract P getProcessor()
@@ -87,8 +96,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean")
final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
- ? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
- : [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
+ ? ResultOutputStrategy.values().collect {r -> r.getValue()}.join(", ")
+ : [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", ")
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
@@ -106,7 +115,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
AbstractJsonQueryElasticsearch.TYPE.getName(), AbstractJsonQueryElasticsearch.TYPE.getName(),
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
- AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
+ AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", "),
AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(),
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
)))
@@ -114,14 +123,22 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
@Test
void testBasicQuery() throws Exception {
- // test hits (no splitting)
+ // test hits (no splitting) - full hit format
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
final FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
hits.assertAttributeEquals("hit.count", "10")
assertOutputContent(hits.getContent(), 10, false)
+ final List<Map<String, Object>> result = OBJECT_MAPPER.readValue(hits.getContent(), List.class)
+ result.forEach({ hit ->
+ final Map<String, Object> h = ((Map<String, Object>)hit)
+ assertFalse(h.isEmpty())
+ assertTrue(h.containsKey("_source"))
+ assertTrue(h.containsKey("_index"))
+ })
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
@@ -132,14 +149,46 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
reset(runner)
- // test splitting hits
- runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ // test splitting hits - _source only format
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
+ runOnce(runner)
+ testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
+ runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
+ hit.assertAttributeEquals("hit.count", "1")
+ assertOutputContent(hit.getContent(), 1, false)
+ final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+ assertFalse(h.isEmpty())
+ assertFalse(h.containsKey("_source"))
+ assertFalse(h.containsKey("_index"))
+ // should be the _source content only
+ assertTrue(h.containsKey("msg"))
+
+ assertThat(
+ runner.getProvenanceEvents().stream().filter({ pe ->
+ pe.getEventType() == ProvenanceEventType.RECEIVE &&
+ pe.getAttribute("uuid") == hit.getAttribute("uuid")
+ }).count(),
+ is(1L)
+ )
+ })
+ reset(runner)
+
+
+ // test splitting hits - metadata only format
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.METADATA_ONLY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
{ hit ->
hit.assertAttributeEquals("hit.count", "1")
assertOutputContent(hit.getContent(), 1, false)
+ final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+ assertFalse(h.isEmpty())
+ assertFalse(h.containsKey("_source"))
+ assertTrue(h.containsKey("_index"))
+
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
@@ -197,9 +246,10 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
]))
- // test aggregations (no splitting)
+ // test aggregations (no splitting) - full aggregation format
final TestRunner runner = createRunner(true)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 1)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
@@ -208,30 +258,57 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
aggregations.assertAttributeNotExists("aggregation.name")
// count == 1 because aggregations is a single Map rather than a List of Maps, even when there are multiple aggs
assertOutputContent(aggregations.getContent(), 1, false)
+ Map<String, Object> agg = OBJECT_MAPPER.readValue(aggregations.getContent(), Map.class)
+ // agg Map of 2 Maps (buckets and metadata)
+ assertThat(agg.size(), is(2))
+ agg.keySet().forEach({ aggName ->
+ final Map<String, Object> termAgg = agg.get(aggName) as Map<String, Object>
+ assertInstanceOf(List.class, termAgg.get("buckets"))
+ assertTrue(termAgg.containsKey("doc_count_error_upper_bound"))
+ })
reset(runner)
- // test with the query parameter and no incoming connection
+ // test with the query parameter and no incoming connection - buckets only aggregation format
runner.setIncomingConnection(false)
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.BUCKETS_ONLY.getValue())
runner.run(1, true, true)
testCounts(runner, 0, 1, 0, 1)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
- runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.number")
- runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.name")
+ final MockFlowFile singleAgg = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0)
+ singleAgg.assertAttributeNotExists("aggregation.number")
+ singleAgg.assertAttributeNotExists("aggregation.name")
+ agg = OBJECT_MAPPER.readValue(singleAgg.getContent(), Map.class)
+ // agg Map of 2 Lists (bucket contents only)
+ assertThat(agg.size(), is(2))
+ agg.keySet().forEach({ aggName ->
+ final List<Map<String, Object>> termAgg = agg.get(aggName) as List<Map<String, Object>>
+ assertThat(termAgg.size(), is(5))
+ termAgg.forEach({a ->
+ assertTrue(a.containsKey("key"))
+ assertTrue(a.containsKey("doc_count"))
+ })
+ })
reset(runner)
- // test splitting aggregations
- runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ // test splitting aggregations - metadata only aggregation format
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.METADATA_ONLY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
int a = 0
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
- { agg ->
- agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
- agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
- assertOutputContent(agg.getContent(), 1, false)
+ { termAgg ->
+ termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+ termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+ assertOutputContent(termAgg.getContent(), 1, false)
+
+ Map<String, Object> aggContent = OBJECT_MAPPER.readValue(termAgg.getContent(), Map.class)
+ // agg Map (metadata, no buckets)
+ assertTrue(aggContent.containsKey("doc_count_error_upper_bound"))
+ assertFalse(aggContent.containsKey("buckets"))
}
)
reset(runner)
@@ -248,15 +325,16 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "\${es.index}")
runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "\${es.type}")
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
a = 0
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
- { agg ->
- agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
- agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
- assertOutputContent(agg.getContent(), 1, false)
+ { termAgg ->
+ termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
+ termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
+ assertOutputContent(termAgg.getContent(), 1, false)
}
)
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 0ec4470f76..4db5bccb79 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -17,8 +17,11 @@
package org.apache.nifi.processors.elasticsearch
-import org.apache.nifi.components.AllowableValue
import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
+import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
@@ -46,12 +49,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
"'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
"'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> " +
"is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
- AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
- [
- AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL,
- AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER,
- AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME
- ].join(", "),
+ AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), PaginationType.values().collect {p -> p.getValue()}.join(", "),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
)))
@@ -80,7 +78,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// paged query hits splitting
- runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
input = runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
@@ -102,7 +100,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// paged query hits combined
- runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
input = runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
@@ -124,7 +122,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInDelete(true)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// still expect "success" output for exception during final clean-up
@@ -146,7 +144,9 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInDelete(true)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
+ runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// still expect "success" output for exception during final clean-up
@@ -168,7 +168,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInPit(true)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// expect "failure" output for exception during query setup
@@ -189,7 +189,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
void testQuerySortError() {
// test PiT without sort
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
// expect "failure" output for exception during query setup
@@ -208,7 +208,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// test search_after without sort
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SEARCH_AFTER.getValue())
runOnce(runner)
testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
@@ -222,27 +222,27 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// test scroll without sort (should succeed)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runMultiple(runner, 2)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
}
@Test
void testScroll() {
- testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+ testPagination(PaginationType.SCROLL)
}
@Test
void testPit() {
- testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+ testPagination(PaginationType.POINT_IN_TIME)
}
@Test
void testSearchAfter() {
- testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+ testPagination(PaginationType.SEARCH_AFTER)
}
- abstract void testPagination(final AllowableValue paginationType)
+ abstract void testPagination(final PaginationType paginationType)
private void runMultiple(final TestRunner runner, final int maxIterations) {
if (isInput()) {
@@ -278,7 +278,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
service.setMaxPages(0)
// test that an empty flow file is produced for a per query setup
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@@ -288,7 +288,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
reset(runner)
// test that an empty flow file is produced for a per hit setup
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@@ -298,7 +298,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
reset(runner)
// test that an empty flow file is produced for a per response setup
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_RESPONSE.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
index 3b2d4fe323..992b548b94 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
@@ -17,7 +17,9 @@
package org.apache.nifi.processors.elasticsearch
-import org.apache.nifi.components.AllowableValue
+
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.util.TestRunner
import static groovy.json.JsonOutput.prettyPrint
@@ -38,12 +40,12 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
return true
}
- void testPagination(final AllowableValue paginationType) {
+ void testPagination(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
runOnce(runner)
@@ -60,7 +62,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
// test hits splitting
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
testCounts(runner, 1, 20, 0, 0)
int count = 0
@@ -76,7 +78,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
// test hits combined
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
runOnce(runner)
testCounts(runner, 1, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
index ea60ce8b93..e3bbbff42e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy
@@ -17,8 +17,10 @@
package org.apache.nifi.processors.elasticsearch
-import org.apache.nifi.components.AllowableValue
+
import org.apache.nifi.components.state.Scope
+import org.apache.nifi.processors.elasticsearch.api.PaginationType
+import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.TestRunner
import org.junit.Test
@@ -50,7 +52,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
service.setThrowErrorInSearch(false)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
// initialise search
@@ -73,25 +75,25 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
@Test
void testScrollExpiration() {
- testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
+ testPaginationExpiration(PaginationType.SCROLL)
}
@Test
void testPitExpiration() {
- testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
+ testPaginationExpiration(PaginationType.POINT_IN_TIME)
}
@Test
void testSearchAfterExpiration() {
- testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
+ testPaginationExpiration(PaginationType.SEARCH_AFTER)
}
- private void testPaginationExpiration(final AllowableValue paginationType) {
+ private void testPaginationExpiration(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "1 sec")
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
@@ -131,12 +133,12 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
runner.clearTransferState()
}
- void testPagination(final AllowableValue paginationType) {
+ void testPagination(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
// first page
@@ -163,7 +165,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
// test hits splitting
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
// first page
runOnce(runner)
@@ -197,7 +199,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
// test hits combined
- runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+ runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
// hits are combined from all pages within a single trigger of the processor
runOnce(runner)
testCounts(runner, 0, 1, 0, 0)
@@ -211,7 +213,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
}
- private static void assertState(final MockStateManager stateManager, final AllowableValue paginationType,
+ private static void assertState(final MockStateManager stateManager, final PaginationType paginationType,
final int hitCount, final int pageCount) {
stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT, Integer.toString(hitCount), Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_PAGE_COUNT, Integer.toString(pageCount), Scope.LOCAL)
@@ -220,17 +222,17 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
assertThat(Long.parseLong(pageExpirationTimestamp) > Instant.now().toEpochMilli(), is(true))
switch (paginationType) {
- case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL:
+ case PaginationType.SCROLL:
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-${pageCount}", Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, Scope.LOCAL)
break
- case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME:
+ case PaginationType.POINT_IN_TIME:
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-${pageCount}", Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
break
- case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER:
+ case PaginationType.SEARCH_AFTER:
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
index 7aea00b772..80f80a1ba3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/MapBuilder.java
@@ -17,14 +17,14 @@
package org.apache.nifi.elasticsearch;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
public class MapBuilder {
private final Map<String, Object> toBuild;
public MapBuilder() {
- toBuild = new HashMap<>();
+ toBuild = new LinkedHashMap<>();
}
public MapBuilder of(final String key, final Object value) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index da145818c5..1c0cd7b10b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -87,7 +87,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
- <elasticsearch_docker_image>8.5.0</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@@ -118,7 +118,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
- <elasticsearch_docker_image>7.17.7</elasticsearch_docker_image>
+ <elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
</properties>
</profile>
</profiles>