You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/11/26 06:09:44 UTC
[nifi] 01/02: NIFI-6403: Adding Elasticsearch7 support to HTTP
processors
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit a3d845a38f93fed1f65ec85c851ca845ef186556
Author: Joe Gresock <jo...@lmco.com>
AuthorDate: Thu Mar 19 17:17:04 2020 +0000
NIFI-6403: Adding Elasticsearch7 support to HTTP processors
This closes #4153.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../AbstractElasticsearchHttpProcessor.java | 84 +++++++++++++--
.../elasticsearch/FetchElasticsearchHttp.java | 19 ++--
.../elasticsearch/PutElasticsearchHttp.java | 6 +-
.../elasticsearch/PutElasticsearchHttpRecord.java | 6 +-
.../elasticsearch/QueryElasticsearchHttp.java | 20 ++--
.../elasticsearch/ScrollElasticsearchHttp.java | 23 ++--
.../elasticsearch/ITQueryElasticsearchHttp.java | 3 +
.../elasticsearch/ITScrollElasticsearchHttp.java | 2 +
.../PutElasticsearchHttpRecordIT.java | 7 ++
.../elasticsearch/TestFetchElasticsearchHttp.java | 16 ++-
.../elasticsearch/TestPutElasticsearchHttp.java | 39 ++++++-
.../TestPutElasticsearchHttpRecord.java | 117 ++++++++++++++++++++-
.../elasticsearch/TestQueryElasticsearchHttp.java | 64 ++++++++++-
.../elasticsearch/TestScrollElasticsearchHttp.java | 50 ++++++++-
14 files changed, 405 insertions(+), 51 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index ab211c4..291d99b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,9 +36,11 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;
import org.apache.commons.text.StringEscapeUtils;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
@@ -55,8 +57,13 @@ import org.apache.nifi.util.StringUtils;
* A base class for Elasticsearch processors that use the HTTP API
*/
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
+ enum ElasticsearchVersion {
+ ES_7,
+ ES_LESS_THAN_7
+ }
static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+ static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
static final String QUERY_QUERY_PARAM = "q";
static final String SORT_QUERY_PARAM = "sort";
static final String SIZE_QUERY_PARAM = "size";
@@ -127,6 +134,18 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
+ .name("elasticsearch-http-version")
+ .displayName("Elasticsearch Version")
+ .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
+ .required(true)
+ .allowableValues(
+ new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
+ new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
+ .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@Override
@@ -148,6 +167,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ES_URL);
+ properties.add(ES_VERSION);
properties.add(PROP_SSL_CONTEXT_SERVICE);
properties.add(CHARSET);
properties.add(USERNAME);
@@ -287,9 +307,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append(indexOp.toLowerCase());
sb.append("\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
- sb.append("\", \"_type\": \"");
- sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
+ if (!(StringUtils.isEmpty(docType) | docType == null)){
+ sb.append(", \"_type\": \"");
+ sb.append(StringEscapeUtils.escapeJson(docType));
+ sb.append("\"");
+ }
if (!StringUtils.isEmpty(id)) {
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
@@ -301,11 +324,15 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
} else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
- sb.append("\", \"_type\": \"");
- sb.append(StringEscapeUtils.escapeJson(docType));
- sb.append("\", \"_id\": \"");
+ sb.append("\"");
+ if (!(StringUtils.isEmpty(docType) | docType == null)){
+ sb.append(", \"_type\": \"");
+ sb.append(StringEscapeUtils.escapeJson(docType));
+ sb.append("\"");
+ }
+ sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
- sb.append("\" }\n");
+ sb.append("\" } }\n");
sb.append("{\"doc\": ");
sb.append(jsonString);
sb.append(", \"doc_as_upsert\": ");
@@ -314,11 +341,48 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
} else if (indexOp.equalsIgnoreCase("delete")) {
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
- sb.append("\", \"_type\": \"");
- sb.append(StringEscapeUtils.escapeJson(docType));
- sb.append("\", \"_id\": \"");
+ sb.append("\"");
+ if (!(StringUtils.isEmpty(docType) | docType == null)){
+ sb.append(", \"_type\": \"");
+ sb.append(StringEscapeUtils.escapeJson(docType));
+ sb.append("\"");
+ }
+ sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
- sb.append("\" }\n");
+ sb.append("\" }}\n");
+ }
+ }
+
+ protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
+ return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
+ ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
+ }
+
+ static class ElasticsearchTypeValidator implements Validator {
+ private final boolean pre7TypeRequired;
+
+ /**
+ * Creates a validator for an ES type
+ * @param pre7TypeRequired If true, 'type' will be required for ES
+ * before version 7.0.
+ */
+ public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
+ this.pre7TypeRequired = pre7TypeRequired;
+ }
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
+ .getProperty(ES_VERSION).getValue());
+ if (esVersion == ElasticsearchVersion.ES_7) {
+ return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
+ .explanation("Elasticsearch no longer supports 'type' as of version 7.0. Please use '_doc' or leave blank.")
+ .build();
+ } else {
+ return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
+ .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
+ .build();
+ }
}
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index fcdc9e3..50cad68 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -120,11 +121,11 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("fetch-es-type")
.displayName("Type")
- .description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, "
- + "the first document matching the identifier across all types will be retrieved.")
- .required(false)
+ .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved). "
+ + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+ .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(new ElasticsearchTypeValidator(false))
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -149,6 +150,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+ descriptors.add(ES_VERSION);
descriptors.add(DOC_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -199,6 +201,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final String fields = context.getProperty(FIELDS).isSet()
? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
: null;
+ final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+ .getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -214,7 +218,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
- final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
+ final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
final long startNanos = System.nanoTime();
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -306,7 +310,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
}
- private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
+ private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
@@ -316,7 +320,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
builder.addPathSegment(docId);
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+ final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+ builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
}
// Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 4e60e0a..b427b8e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,10 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-type")
.displayName("Type")
- .description("The type of this document (used by Elasticsearch for indexing and searching)")
+ .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .addValidator(new ElasticsearchTypeValidator(true))
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -152,6 +153,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+ descriptors.add(ES_VERSION);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 97ae8ca..130bde8 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,10 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-record-type")
.displayName("Type")
- .description("The type of this document (used by Elasticsearch for indexing and searching)")
+ .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .addValidator(new ElasticsearchTypeValidator(true))
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -260,6 +261,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+ descriptors.add(ES_VERSION);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index ea350ad..58bab07 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -155,12 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("query-es-type")
.displayName("Type")
- .description(
- "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
- + "the the query will match across all types.")
- .required(false)
+ .description("The type of this document (if empty, searches across all types). "
+ + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+ .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(new ElasticsearchTypeValidator(false))
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -236,6 +235,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
static {
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
+ descriptors.add(ES_VERSION);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -316,7 +316,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.evaluateAttributeExpressions(flowFile).getValue() : null;
final boolean targetIsContent = context.getProperty(TARGET).getValue()
.equals(TARGET_FLOW_FILE_CONTENT);
-
+ final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+ .getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -344,7 +345,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- mPageSize, fromIndex, context);
+ mPageSize, fromIndex, context, esVersion);
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
@@ -505,7 +506,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
- String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
+ String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
@@ -520,7 +521,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+ final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+ builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 528f88c..322f82f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,6 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -135,12 +137,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("scroll-es-type")
.displayName("Type")
- .description(
- "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
- + "the the query will match across all types.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
+ .description("The type of this document (if empty, searches across all types). "
+ + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+ .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(new ElasticsearchTypeValidator(false))
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -185,6 +186,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+ descriptors.add(ES_VERSION);
descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE);
@@ -244,6 +246,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
+ final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+ .getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -260,7 +264,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.getValue());
if (scrollId != null) {
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- scrollId, pageSize, scroll, context);
+ scrollId, pageSize, scroll, context, esVersion);
final long startNanos = System.nanoTime();
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -278,7 +282,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
// read the url property from the context
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- scrollId, pageSize, scroll, context);
+ scrollId, pageSize, scroll, context, esVersion);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -415,7 +419,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
- String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
+ String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
@@ -433,7 +437,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+ final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+ builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index f60e2f9..e3ace24 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -42,6 +43,7 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
@@ -68,6 +70,7 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index 57ee242..b25e3e2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull;
import java.io.IOException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -40,6 +41,7 @@ public class ITScrollElasticsearchHttp {
runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://ip-172-31-49-152.ec2.internal:9200");
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index fbab1cc..c13bf40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -18,6 +18,8 @@
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@@ -61,6 +63,7 @@ public class PutElasticsearchHttpRecordIT {
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+ FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
FETCH_RUNNER.assertValid();
}
@@ -74,6 +77,7 @@ public class PutElasticsearchHttpRecordIT {
runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
+ runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -209,6 +213,7 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+ runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
@@ -232,6 +237,7 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+ runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
@@ -255,6 +261,7 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
+ runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index 9309d02..fb8b164 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -73,10 +74,18 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
- runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
- runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.assertValid();
+ runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.assertValid(); // Valid because type is not required prior to 7.0
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.assertValid();
+ runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+ runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
+ runner.assertValid(); // Valid because type is not required prior to 7.0
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
@@ -126,6 +135,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
@@ -174,6 +184,7 @@ public class TestFetchElasticsearchHttp {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.setIncomingConnection(true);
@@ -273,6 +284,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
// Allow time for the controller service to fully initialize
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index b5a072c..896b981 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -108,6 +109,28 @@ public class TestPutElasticsearchHttp {
}
@Test
+ public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+ runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttp.TYPE, "");
+ runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+ runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+ runner.enqueue(docExample, new HashMap<String, String>() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ out.assertAttributeEquals("doc_id", "28039652140");
+ }
+
+ @Test
public void testPutElasticSearchOnTriggerUpdate() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@@ -201,8 +224,8 @@ public class TestPutElasticsearchHttp {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
+ runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.assertValid();
@@ -315,9 +338,21 @@ public class TestPutElasticsearchHttp {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
+ runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+ runner.setProperty(PutElasticsearchHttp.TYPE, "");
+ runner.assertNotValid(); // Not valid because type is required prior to 7.0
+ runner.setProperty(PutElasticsearchHttp.TYPE, " ");
+ runner.assertNotValid(); // Not valid because type is required prior to 7.0
+ runner.removeProperty(PutElasticsearchHttp.TYPE);
+ runner.assertNotValid(); // Not valid because type is required prior to 7.0
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+ runner.assertValid();
+ runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+ runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
+ runner.assertValid();
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 5d52978..a651aef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,6 +28,7 @@ import okio.Buffer;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -142,6 +143,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
+
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
@@ -151,6 +153,73 @@ public class TestPutElasticsearchHttpRecord {
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
+ out.assertAttributeEquals("record.count", "4");
+ List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+ assertNotNull(provEvents);
+ assertEquals(1, provEvents.size());
+ assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
+ }
+
+ @Test
+ public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+ PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+ DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
+ DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/YYYY h:m a");
+ processor.setRecordChecks(record -> {
+ assertEquals(1, record.get("id"));
+ assertEquals("reç1", record.get("name"));
+ assertEquals(101, record.get("code"));
+ assertEquals("20/12/2018", record.get("date"));
+ assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+ assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+ }, record -> {
+ assertEquals(2, record.get("id"));
+ assertEquals("reç2", record.get("name"));
+ assertEquals(102, record.get("code"));
+ assertEquals("20/12/2018", record.get("date"));
+ assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+ assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+ }, record -> {
+ assertEquals(3, record.get("id"));
+ assertEquals("reç3", record.get("name"));
+ assertEquals(103, record.get("code"));
+ assertEquals("20/12/2018", record.get("date"));
+ assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+ assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+ }, record -> {
+ assertEquals(4, record.get("id"));
+ assertEquals("reç4", record.get("name"));
+ assertEquals(104, record.get("code"));
+ assertEquals("20/12/2018", record.get("date"));
+ assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+ assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+ });
+ runner = TestRunners.newTestRunner(processor); // no failures
+ generateTestData();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
+ runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
+ runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
+
+ runner.enqueue(new byte[0], new HashMap<String, String>() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ out.assertAttributeEquals("doc_id", "28039652140");
+ out.assertAttributeEquals("record.count", "4");
+ List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+ assertNotNull(provEvents);
+ assertEquals(1, provEvents.size());
+ assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
}
@Test
@@ -175,6 +244,28 @@ public class TestPutElasticsearchHttpRecord {
}
@Test
+ public void testPutElasticSearchOnTriggerUpdate_withoutType() throws IOException {
+ runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+ generateTestData();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
+ runner.enqueue(new byte[0], new HashMap<String, String>() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ out.assertAttributeEquals("doc_id", "28039652140");
+ }
+
+ @Test
public void testPutElasticSearchOnTriggerDelete() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
@@ -196,6 +287,28 @@ public class TestPutElasticsearchHttpRecord {
}
@Test
+ public void testPutElasticSearchOnTriggerDelete_withoutType() throws IOException {
+ runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+ generateTestData();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
+ runner.enqueue(new byte[0], new HashMap<String, String>() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ out.assertAttributeEquals("doc_id", "28039652140");
+ }
+
+ @Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
@@ -248,8 +361,8 @@ public class TestPutElasticsearchHttpRecord {
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.assertNotValid();
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
@@ -379,8 +492,8 @@ public class TestPutElasticsearchHttpRecord {
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.assertNotValid();
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index a6e3220..0f6b6e4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,6 +32,7 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -64,6 +65,7 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -83,6 +85,7 @@ public class TestQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -104,10 +107,22 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
- runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
- runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.assertValid(); // Valid because type is not required prior to 7.0
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+ runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
+ runner.assertValid();
+ runner.removeProperty(QueryElasticsearchHttp.TYPE);
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
@@ -123,6 +138,7 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -149,6 +165,7 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -194,6 +211,7 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -213,6 +231,7 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -235,6 +254,7 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(500, "Server error");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -260,6 +280,7 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -285,6 +306,7 @@ public class TestQueryElasticsearchHttp {
processor.setExceptionToThrow(new IOException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -310,6 +332,7 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -336,6 +359,7 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 1);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -358,7 +382,9 @@ public class TestQueryElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
// Allow time for the controller service to fully initialize
@@ -395,6 +421,7 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("doc_id", "28039652140");
@@ -422,6 +449,7 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("doc_id", "28039652140");
@@ -437,6 +465,7 @@ public class TestQueryElasticsearchHttp {
p.setExpectedParam("myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -446,6 +475,35 @@ public class TestQueryElasticsearchHttp {
runAndVerifySuccess(true);
}
+ @Test
+ public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
+ QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
+ p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+ runner = TestRunners.newTestRunner(p);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+ runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+ runAndVerifySuccess(true);
+
+ // Now test with ES 7.x
+
+ p = new QueryElasticsearchHttpTestProcessor();
+ p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+ runner = TestRunners.newTestRunner(p);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+ runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+ runAndVerifySuccess(true);
+ }
+
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
@@ -529,7 +587,7 @@ public class TestQueryElasticsearchHttp {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
- assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+ assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index eab33af..74ac031 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -80,6 +81,35 @@ public class TestScrollElasticsearchHttp {
}
@Test
+ public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
+ ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
+ p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+ runner = TestRunners.newTestRunner(p);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+ runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+ runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+ runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+ runAndVerifySuccess();
+
+ // Now test with ES 7.x
+
+ p = new ScrollElasticsearchHttpTestProcessor();
+ p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+ runner = TestRunners.newTestRunner(p);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+ runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+ runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+ runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+ runAndVerifySuccess();
+ }
+
+ @Test
public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException {
runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@@ -133,10 +163,20 @@ public class TestScrollElasticsearchHttp {
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
- runner.assertNotValid();
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
+ runner.assertNotValid();
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+ runner.assertValid(); // Valid because type is not required prior to 7.0
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertValid();
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+ runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+ runner.assertValid();
+ runner.removeProperty(ScrollElasticsearchHttp.TYPE);
+ runner.assertNotValid();
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -246,7 +286,9 @@ public class TestScrollElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+ runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
@@ -408,7 +450,9 @@ public class TestScrollElasticsearchHttp {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
- assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+ if (realRequest.method().equals("GET")) {
+ assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
+ }
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)