You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/11/26 06:09:44 UTC

[nifi] 01/02: NIFI-6403: Adding Elasticsearch7 support to HTTP processors

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit a3d845a38f93fed1f65ec85c851ca845ef186556
Author: Joe Gresock <jo...@lmco.com>
AuthorDate: Thu Mar 19 17:17:04 2020 +0000

    NIFI-6403: Adding Elasticsearch7 support to HTTP processors
    
    This closes #4153.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../AbstractElasticsearchHttpProcessor.java        |  84 +++++++++++++--
 .../elasticsearch/FetchElasticsearchHttp.java      |  19 ++--
 .../elasticsearch/PutElasticsearchHttp.java        |   6 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   6 +-
 .../elasticsearch/QueryElasticsearchHttp.java      |  20 ++--
 .../elasticsearch/ScrollElasticsearchHttp.java     |  23 ++--
 .../elasticsearch/ITQueryElasticsearchHttp.java    |   3 +
 .../elasticsearch/ITScrollElasticsearchHttp.java   |   2 +
 .../PutElasticsearchHttpRecordIT.java              |   7 ++
 .../elasticsearch/TestFetchElasticsearchHttp.java  |  16 ++-
 .../elasticsearch/TestPutElasticsearchHttp.java    |  39 ++++++-
 .../TestPutElasticsearchHttpRecord.java            | 117 ++++++++++++++++++++-
 .../elasticsearch/TestQueryElasticsearchHttp.java  |  64 ++++++++++-
 .../elasticsearch/TestScrollElasticsearchHttp.java |  50 ++++++++-
 14 files changed, 405 insertions(+), 51 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index ab211c4..291d99b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,9 +36,11 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.Route;
 import org.apache.commons.text.StringEscapeUtils;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -55,8 +57,13 @@ import org.apache.nifi.util.StringUtils;
  * A base class for Elasticsearch processors that use the HTTP API
  */
 public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
+    enum ElasticsearchVersion {
+        ES_7,
+        ES_LESS_THAN_7
+    }
 
     static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+    static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
     static final String QUERY_QUERY_PARAM = "q";
     static final String SORT_QUERY_PARAM = "sort";
     static final String SIZE_QUERY_PARAM = "size";
@@ -127,6 +134,18 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-version")
+            .displayName("Elasticsearch Version")
+            .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
+            .required(true)
+            .allowableValues(
+                    new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
+                    new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
+            .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
 
     @Override
@@ -148,6 +167,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ES_URL);
+        properties.add(ES_VERSION);
         properties.add(PROP_SSL_CONTEXT_SERVICE);
         properties.add(CHARSET);
         properties.add(USERNAME);
@@ -287,9 +307,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append(indexOp.toLowerCase());
             sb.append("\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
             sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
             if (!StringUtils.isEmpty(id)) {
                 sb.append(", \"_id\": \"");
                 sb.append(StringEscapeUtils.escapeJson(id));
@@ -301,11 +324,15 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
         } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
             sb.append("{\"update\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
-            sb.append("\", \"_id\": \"");
+            sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
+            sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }\n");
+            sb.append("\" } }\n");
             sb.append("{\"doc\": ");
             sb.append(jsonString);
             sb.append(", \"doc_as_upsert\": ");
@@ -314,11 +341,48 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
         } else if (indexOp.equalsIgnoreCase("delete")) {
             sb.append("{\"delete\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
-            sb.append("\", \"_id\": \"");
+            sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
+            sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }\n");
+            sb.append("\" }}\n");
+        }
+    }
+
+    protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
+        return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
+                ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
+    }
+
+    static class ElasticsearchTypeValidator implements Validator {
+        private final boolean pre7TypeRequired;
+
+        /**
+         * Creates a validator for an ES type
+         * @param pre7TypeRequired If true, 'type' will be required for ES
+         * before version 7.0.
+         */
+        public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
+            this.pre7TypeRequired = pre7TypeRequired;
+        }
+
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
+            ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
+                    .getProperty(ES_VERSION).getValue());
+            if (esVersion == ElasticsearchVersion.ES_7) {
+                return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
+                        .explanation("Elasticsearch no longer supports 'type' as of version 7.0.  Please use '_doc' or leave blank.")
+                        .build();
+            } else {
+                return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
+                        .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
+                        .build();
+            }
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index fcdc9e3..50cad68 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -120,11 +121,11 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("fetch-es-type")
             .displayName("Type")
-            .description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, "
-                    + "the first document matching the identifier across all types will be retrieved.")
-            .required(false)
+            .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -149,6 +150,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(DOC_ID);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -199,6 +201,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String fields = context.getProperty(FIELDS).isSet()
                 ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
                 : null;
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -214,7 +218,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
             // read the url property from the context
             final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
+            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
             final long startNanos = System.nanoTime();
 
             getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -306,7 +310,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         }
     }
 
-    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
+    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -316,7 +320,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addPathSegment(docId);
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
         }
 
         // Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 4e60e0a..b427b8e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,10 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-type")
             .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(true))
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -152,6 +153,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(ID_ATTRIBUTE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 97ae8ca..130bde8 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,10 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-record-type")
             .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(true))
             .build();
 
     static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -260,6 +261,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index ea350ad..58bab07 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -155,12 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("query-es-type")
             .displayName("Type")
-            .description(
-                    "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
-                            + "the the query will match across all types.")
-            .required(false)
+            .description("The type of this document (if empty, searches across all types).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -236,6 +235,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     static {
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(QUERY);
+        descriptors.add(ES_VERSION);
         descriptors.add(PAGE_SIZE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -316,7 +316,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final boolean targetIsContent = context.getProperty(TARGET).getValue()
                 .equals(TARGET_FLOW_FILE_CONTENT);
-
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -344,7 +345,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 }
 
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        mPageSize, fromIndex, context);
+                        mPageSize, fromIndex, context, esVersion);
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
@@ -505,7 +506,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
+            String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -520,7 +521,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
         }
         if (!StringUtils.isEmpty(sort)) {
             String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 528f88c..322f82f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,6 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -135,12 +137,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("scroll-es-type")
             .displayName("Type")
-            .description(
-                    "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
-                    + "the the query will match across all types.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
+            .description("The type of this document (if empty, searches across all types).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -185,6 +186,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(QUERY);
         descriptors.add(SCROLL_DURATION);
         descriptors.add(PAGE_SIZE);
@@ -244,6 +246,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
                 .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -260,7 +264,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                     .getValue());
             if (scrollId != null) {
                 final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context);
+                        scrollId, pageSize, scroll, context, esVersion);
                 final long startNanos = System.nanoTime();
 
                 final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -278,7 +282,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
 
                 // read the url property from the context
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context);
+                        scrollId, pageSize, scroll, context, esVersion);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -415,7 +419,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
+            String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -433,7 +437,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
             if (!StringUtils.isEmpty(fields)) {
                 String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-                builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+                final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+                builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
             }
             if (!StringUtils.isEmpty(sort)) {
                 String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index f60e2f9..e3ace24 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -42,6 +43,7 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
@@ -68,6 +70,7 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index 57ee242..b25e3e2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -40,6 +41,7 @@ public class ITScrollElasticsearchHttp {
         runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://ip-172-31-49-152.ec2.internal:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index fbab1cc..c13bf40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -61,6 +63,7 @@ public class PutElasticsearchHttpRecordIT {
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         FETCH_RUNNER.assertValid();
     }
 
@@ -74,6 +77,7 @@ public class PutElasticsearchHttpRecordIT {
         runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
         runner.addControllerService("reader", recordReader);
         runner.enableControllerService(recordReader);
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
         runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -209,6 +213,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -232,6 +237,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -255,6 +261,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index 9309d02..fb8b164 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -73,10 +74,18 @@ public class TestFetchElasticsearchHttp {
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.assertValid();
+        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
         runner.assertValid();
+        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
         runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
         runner.assertValid();
 
@@ -126,6 +135,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
@@ -174,6 +184,7 @@ public class TestFetchElasticsearchHttp {
         runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         runner.setIncomingConnection(true);
@@ -273,6 +284,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index b5a072c..896b981 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -108,6 +109,28 @@ public class TestPutElasticsearchHttp {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerUpdate() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@@ -201,8 +224,8 @@ public class TestPutElasticsearchHttp {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.assertValid();
@@ -315,9 +338,21 @@ public class TestPutElasticsearchHttp {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.setProperty(PutElasticsearchHttp.TYPE, " ");
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.removeProperty(PutElasticsearchHttp.TYPE);
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 5d52978..a651aef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,6 +28,7 @@ import okio.Buffer;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -142,6 +143,7 @@ public class TestPutElasticsearchHttpRecord {
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
+
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
             put("doc_id", "28039652140");
         }});
@@ -151,6 +153,73 @@ public class TestPutElasticsearchHttpRecord {
         final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
         assertNotNull(out);
         out.assertAttributeEquals("doc_id", "28039652140");
+        out.assertAttributeEquals("record.count", "4");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
+        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/YYYY h:m a");
+        processor.setRecordChecks(record -> {
+            assertEquals(1, record.get("id"));
+            assertEquals("reç1", record.get("name"));
+            assertEquals(101, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(2, record.get("id"));
+            assertEquals("reç2", record.get("name"));
+            assertEquals(102, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(3, record.get("id"));
+            assertEquals("reç3", record.get("name"));
+            assertEquals(103, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(4, record.get("id"));
+            assertEquals("reç4", record.get("name"));
+            assertEquals(104, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        });
+        runner = TestRunners.newTestRunner(processor); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
+        runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
+        runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+        out.assertAttributeEquals("record.count", "4");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
     }
 
     @Test
@@ -175,6 +244,28 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerUpdate_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerDelete() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
@@ -196,6 +287,28 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerDelete_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerEL() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
@@ -248,8 +361,8 @@ public class TestPutElasticsearchHttpRecord {
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
@@ -379,8 +492,8 @@ public class TestPutElasticsearchHttpRecord {
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index a6e3220..0f6b6e4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,6 +32,7 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -64,6 +65,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -83,6 +85,7 @@ public class TestQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setValidateExpressionUsage(true);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -104,10 +107,22 @@ public class TestQueryElasticsearchHttp {
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.QUERY,
                 "source:Twitter AND identifier:\"${identifier}\"");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
         runner.assertValid();
         runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
         runner.assertValid();
@@ -123,6 +138,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -149,6 +165,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -194,6 +211,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -213,6 +231,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -235,6 +254,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(500, "Server error");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -260,6 +280,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -285,6 +306,7 @@ public class TestQueryElasticsearchHttp {
         processor.setExceptionToThrow(new IOException("Error reading from disk"));
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -310,6 +332,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 2);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -336,6 +359,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 1);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -358,7 +382,9 @@ public class TestQueryElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -395,6 +421,7 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -422,6 +449,7 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -437,6 +465,7 @@ public class TestQueryElasticsearchHttp {
         p.setExpectedParam("myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -446,6 +475,35 @@ public class TestQueryElasticsearchHttp {
         runAndVerifySuccess(true);
     }
 
+    @Test
+    public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
+        QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess(true);
+
+        // Now test with ES 7.x
+
+        p = new QueryElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess(true);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -529,7 +587,7 @@ public class TestQueryElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
-                    assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+                    assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index eab33af..74ac031 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -80,6 +81,35 @@ public class TestScrollElasticsearchHttp {
     }
 
     @Test
+    public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
+        ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess();
+
+        // Now test with ES 7.x
+
+        p = new ScrollElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess();
+    }
+
+    @Test
     public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException {
         runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@@ -133,10 +163,20 @@ public class TestScrollElasticsearchHttp {
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.assertValid();
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
         runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -246,7 +286,9 @@ public class TestScrollElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
         runner.setIncomingConnection(false);
 
@@ -408,7 +450,9 @@ public class TestScrollElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
-                    assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+                    if (realRequest.method().equals("GET")) {
+                        assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
+                    }
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)