You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/11/26 06:09:43 UTC

[nifi] branch main updated (857eeca -> 124cdbd)

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 857eeca  NIFI-8032: fix record-path-guide.adoc
     new a3d845a  NIFI-6403: Adding Elasticsearch7 support to HTTP processors
     new 124cdbd  NIFI-6403 and NIFI-6404: Elasticsearch 7 support

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi-elasticsearch-client-service/pom.xml      |  16 +--
 .../ElasticSearchClientServiceImpl.java            | 124 ++++++++++-----------
 .../AbstractElasticsearchHttpProcessor.java        |  36 +++---
 .../elasticsearch/FetchElasticsearchHttp.java      |  11 +-
 .../elasticsearch/PutElasticsearchHttp.java        |   5 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   5 +-
 .../elasticsearch/QueryElasticsearchHttp.java      |  17 ++-
 .../elasticsearch/ScrollElasticsearchHttp.java     |  13 +--
 .../PutElasticsearchHttpRecordIT.java              |   1 +
 .../elasticsearch/TestFetchElasticsearchHttp.java  |  83 +++++++++-----
 .../elasticsearch/TestPutElasticsearchHttp.java    |  34 +++++-
 .../TestPutElasticsearchHttpRecord.java            | 113 ++++++++++++++++++-
 .../elasticsearch/TestQueryElasticsearchHttp.java  |  28 ++++-
 .../elasticsearch/TestScrollElasticsearchHttp.java |  29 ++++-
 .../PutElasticsearchRecordTest.groovy              |  37 ++++++
 15 files changed, 405 insertions(+), 147 deletions(-)


[nifi] 01/02: NIFI-6403: Adding Elasticsearch7 support to HTTP processors

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit a3d845a38f93fed1f65ec85c851ca845ef186556
Author: Joe Gresock <jo...@lmco.com>
AuthorDate: Thu Mar 19 17:17:04 2020 +0000

    NIFI-6403: Adding Elasticsearch7 support to HTTP processors
    
    This closes #4153.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../AbstractElasticsearchHttpProcessor.java        |  84 +++++++++++++--
 .../elasticsearch/FetchElasticsearchHttp.java      |  19 ++--
 .../elasticsearch/PutElasticsearchHttp.java        |   6 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   6 +-
 .../elasticsearch/QueryElasticsearchHttp.java      |  20 ++--
 .../elasticsearch/ScrollElasticsearchHttp.java     |  23 ++--
 .../elasticsearch/ITQueryElasticsearchHttp.java    |   3 +
 .../elasticsearch/ITScrollElasticsearchHttp.java   |   2 +
 .../PutElasticsearchHttpRecordIT.java              |   7 ++
 .../elasticsearch/TestFetchElasticsearchHttp.java  |  16 ++-
 .../elasticsearch/TestPutElasticsearchHttp.java    |  39 ++++++-
 .../TestPutElasticsearchHttpRecord.java            | 117 ++++++++++++++++++++-
 .../elasticsearch/TestQueryElasticsearchHttp.java  |  64 ++++++++++-
 .../elasticsearch/TestScrollElasticsearchHttp.java |  50 ++++++++-
 14 files changed, 405 insertions(+), 51 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index ab211c4..291d99b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,9 +36,11 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.Route;
 import org.apache.commons.text.StringEscapeUtils;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -55,8 +57,13 @@ import org.apache.nifi.util.StringUtils;
  * A base class for Elasticsearch processors that use the HTTP API
  */
 public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
+    enum ElasticsearchVersion {
+        ES_7,
+        ES_LESS_THAN_7
+    }
 
     static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+    static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
     static final String QUERY_QUERY_PARAM = "q";
     static final String SORT_QUERY_PARAM = "sort";
     static final String SIZE_QUERY_PARAM = "size";
@@ -127,6 +134,18 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-version")
+            .displayName("Elasticsearch Version")
+            .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
+            .required(true)
+            .allowableValues(
+                    new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
+                    new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
+            .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
 
     @Override
@@ -148,6 +167,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ES_URL);
+        properties.add(ES_VERSION);
         properties.add(PROP_SSL_CONTEXT_SERVICE);
         properties.add(CHARSET);
         properties.add(USERNAME);
@@ -287,9 +307,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append(indexOp.toLowerCase());
             sb.append("\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
             sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
             if (!StringUtils.isEmpty(id)) {
                 sb.append(", \"_id\": \"");
                 sb.append(StringEscapeUtils.escapeJson(id));
@@ -301,11 +324,15 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
         } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
             sb.append("{\"update\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
-            sb.append("\", \"_id\": \"");
+            sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
+            sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }\n");
+            sb.append("\" } }\n");
             sb.append("{\"doc\": ");
             sb.append(jsonString);
             sb.append(", \"doc_as_upsert\": ");
@@ -314,11 +341,48 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
         } else if (indexOp.equalsIgnoreCase("delete")) {
             sb.append("{\"delete\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
-            sb.append("\", \"_type\": \"");
-            sb.append(StringEscapeUtils.escapeJson(docType));
-            sb.append("\", \"_id\": \"");
+            sb.append("\"");
+            if (!(StringUtils.isEmpty(docType) | docType == null)){
+                sb.append(", \"_type\": \"");
+                sb.append(StringEscapeUtils.escapeJson(docType));
+                sb.append("\"");
+            }
+            sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }\n");
+            sb.append("\" }}\n");
+        }
+    }
+
+    protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
+        return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
+                ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
+    }
+
+    static class ElasticsearchTypeValidator implements Validator {
+        private final boolean pre7TypeRequired;
+
+        /**
+         * Creates a validator for an ES type
+         * @param pre7TypeRequired If true, 'type' will be required for ES
+         * before version 7.0.
+         */
+        public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
+            this.pre7TypeRequired = pre7TypeRequired;
+        }
+
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
+            ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
+                    .getProperty(ES_VERSION).getValue());
+            if (esVersion == ElasticsearchVersion.ES_7) {
+                return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
+                        .explanation("Elasticsearch no longer supports 'type' as of version 7.0.  Please use '_doc' or leave blank.")
+                        .build();
+            } else {
+                return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
+                        .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
+                        .build();
+            }
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index fcdc9e3..50cad68 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -120,11 +121,11 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("fetch-es-type")
             .displayName("Type")
-            .description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, "
-                    + "the first document matching the identifier across all types will be retrieved.")
-            .required(false)
+            .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -149,6 +150,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(DOC_ID);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -199,6 +201,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String fields = context.getProperty(FIELDS).isSet()
                 ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
                 : null;
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -214,7 +218,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
             // read the url property from the context
             final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
+            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
             final long startNanos = System.nanoTime();
 
             getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -306,7 +310,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         }
     }
 
-    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
+    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -316,7 +320,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addPathSegment(docId);
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
         }
 
         // Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 4e60e0a..b427b8e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,10 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-type")
             .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(true))
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -152,6 +153,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(ID_ATTRIBUTE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 97ae8ca..130bde8 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,10 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-record-type")
             .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(true))
             .build();
 
     static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -260,6 +261,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index ea350ad..58bab07 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -155,12 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("query-es-type")
             .displayName("Type")
-            .description(
-                    "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
-                            + "the the query will match across all types.")
-            .required(false)
+            .description("The type of this document (if empty, searches across all types).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -236,6 +235,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     static {
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(QUERY);
+        descriptors.add(ES_VERSION);
         descriptors.add(PAGE_SIZE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -316,7 +316,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final boolean targetIsContent = context.getProperty(TARGET).getValue()
                 .equals(TARGET_FLOW_FILE_CONTENT);
-
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -344,7 +345,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 }
 
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        mPageSize, fromIndex, context);
+                        mPageSize, fromIndex, context, esVersion);
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
@@ -505,7 +506,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
+            String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -520,7 +521,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
         }
         if (!StringUtils.isEmpty(sort)) {
             String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 528f88c..322f82f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,6 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -135,12 +137,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("scroll-es-type")
             .displayName("Type")
-            .description(
-                    "The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
-                    + "the the query will match across all types.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
+            .description("The type of this document (if empty, searches across all types).  "
+                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
+            .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(new ElasticsearchTypeValidator(false))
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -185,6 +186,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
+        descriptors.add(ES_VERSION);
         descriptors.add(QUERY);
         descriptors.add(SCROLL_DURATION);
         descriptors.add(PAGE_SIZE);
@@ -244,6 +246,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
                 .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
+        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
+                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -260,7 +264,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                     .getValue());
             if (scrollId != null) {
                 final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context);
+                        scrollId, pageSize, scroll, context, esVersion);
                 final long startNanos = System.nanoTime();
 
                 final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -278,7 +282,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
 
                 // read the url property from the context
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context);
+                        scrollId, pageSize, scroll, context, esVersion);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -415,7 +419,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
+            String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -433,7 +437,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
             if (!StringUtils.isEmpty(fields)) {
                 String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-                builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+                final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
+                builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
             }
             if (!StringUtils.isEmpty(sort)) {
                 String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index f60e2f9..e3ace24 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -42,6 +43,7 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
@@ -68,6 +70,7 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index 57ee242..b25e3e2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -40,6 +41,7 @@ public class ITScrollElasticsearchHttp {
         runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://ip-172-31-49-152.ec2.internal:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index fbab1cc..c13bf40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -61,6 +63,7 @@ public class PutElasticsearchHttpRecordIT {
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         FETCH_RUNNER.assertValid();
     }
 
@@ -74,6 +77,7 @@ public class PutElasticsearchHttpRecordIT {
         runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
         runner.addControllerService("reader", recordReader);
         runner.enableControllerService(recordReader);
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
         runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -209,6 +213,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -232,6 +237,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -255,6 +261,7 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
+        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index 9309d02..fb8b164 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -73,10 +74,18 @@ public class TestFetchElasticsearchHttp {
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.assertValid();
+        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
         runner.assertValid();
+        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
         runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
         runner.assertValid();
 
@@ -126,6 +135,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
@@ -174,6 +184,7 @@ public class TestFetchElasticsearchHttp {
         runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         runner.setIncomingConnection(true);
@@ -273,6 +284,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index b5a072c..896b981 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -108,6 +109,28 @@ public class TestPutElasticsearchHttp {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerUpdate() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@@ -201,8 +224,8 @@ public class TestPutElasticsearchHttp {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.assertValid();
@@ -315,9 +338,21 @@ public class TestPutElasticsearchHttp {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.setProperty(PutElasticsearchHttp.TYPE, " ");
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.removeProperty(PutElasticsearchHttp.TYPE);
+        runner.assertNotValid(); // Not valid because type is required prior to 7.0
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 5d52978..a651aef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,6 +28,7 @@ import okio.Buffer;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -142,6 +143,7 @@ public class TestPutElasticsearchHttpRecord {
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
+
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
             put("doc_id", "28039652140");
         }});
@@ -151,6 +153,73 @@ public class TestPutElasticsearchHttpRecord {
         final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
         assertNotNull(out);
         out.assertAttributeEquals("doc_id", "28039652140");
+        out.assertAttributeEquals("record.count", "4");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
+        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/YYYY h:m a");
+        processor.setRecordChecks(record -> {
+            assertEquals(1, record.get("id"));
+            assertEquals("reç1", record.get("name"));
+            assertEquals(101, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(2, record.get("id"));
+            assertEquals("reç2", record.get("name"));
+            assertEquals(102, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(3, record.get("id"));
+            assertEquals("reç3", record.get("name"));
+            assertEquals(103, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        }, record -> {
+            assertEquals(4, record.get("id"));
+            assertEquals("reç4", record.get("name"));
+            assertEquals(104, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
+            assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
+        });
+        runner = TestRunners.newTestRunner(processor); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
+        runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
+        runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+        out.assertAttributeEquals("record.count", "4");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
     }
 
     @Test
@@ -175,6 +244,28 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerUpdate_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerDelete() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
@@ -196,6 +287,28 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerDelete_withoutType() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerEL() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
@@ -248,8 +361,8 @@ public class TestPutElasticsearchHttpRecord {
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
@@ -379,8 +492,8 @@ public class TestPutElasticsearchHttpRecord {
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
-        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index a6e3220..0f6b6e4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,6 +32,7 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -64,6 +65,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -83,6 +85,7 @@ public class TestQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setValidateExpressionUsage(true);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -104,10 +107,22 @@ public class TestQueryElasticsearchHttp {
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.QUERY,
                 "source:Twitter AND identifier:\"${identifier}\"");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
         runner.assertValid();
         runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
         runner.assertValid();
@@ -123,6 +138,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -149,6 +165,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -194,6 +211,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -213,6 +231,7 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -235,6 +254,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(500, "Server error");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -260,6 +280,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -285,6 +306,7 @@ public class TestQueryElasticsearchHttp {
         processor.setExceptionToThrow(new IOException("Error reading from disk"));
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -310,6 +332,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 2);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -336,6 +359,7 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 1);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -358,7 +382,9 @@ public class TestQueryElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -395,6 +421,7 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -422,6 +449,7 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -437,6 +465,7 @@ public class TestQueryElasticsearchHttp {
         p.setExpectedParam("myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -446,6 +475,35 @@ public class TestQueryElasticsearchHttp {
         runAndVerifySuccess(true);
     }
 
+    @Test
+    public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
+        QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess(true);
+
+        // Now test with ES 7.x
+
+        p = new QueryElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess(true);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -529,7 +587,7 @@ public class TestQueryElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
-                    assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+                    assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index eab33af..74ac031 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -80,6 +81,35 @@ public class TestScrollElasticsearchHttp {
     }
 
     @Test
+    public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
+        ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess();
+
+        // Now test with ES 7.x
+
+        p = new ScrollElasticsearchHttpTestProcessor();
+        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
+        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
+        runAndVerifySuccess();
+    }
+
+    @Test
     public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException {
         runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@@ -133,10 +163,20 @@ public class TestScrollElasticsearchHttp {
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
-        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.assertValid();
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
+        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
         runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -246,7 +286,9 @@ public class TestScrollElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
         runner.setIncomingConnection(false);
 
@@ -408,7 +450,9 @@ public class TestScrollElasticsearchHttp {
                 @Override
                 public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
-                    assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+                    if (realRequest.method().equals("GET")) {
+                        assertTrue((expectedParam == null) || (realRequest.url().toString().contains(expectedParam)));
+                    }
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)
                             .protocol(Protocol.HTTP_1_1)


[nifi] 02/02: NIFI-6403 and NIFI-6404: Elasticsearch 7 support

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 124cdbd3fe5446bff08346e5a28f2883f13e2848
Author: Chris Sampson <ch...@digital.homeoffice.gov.uk>
AuthorDate: Mon Nov 16 13:03:50 2020 +0000

    NIFI-6403 and NIFI-6404: Elasticsearch 7 support
    
    Addressed PR#4153 comments; removed ES Version property and made Type optional in all ES HTTP/Record processors, applying sensible default values where required; use _source queyr parameter instead of _source_include/s as it's compatible between ES versions
    
    Fix unit test compilation to use JDK8-compatible library/method
    
    Better optional type and id handling for PutElasticsearchRecord; update nifi-elasticsearch-client-service build dependencies to use latest versions of Elasticsearch in each supported major version (5/6/7); addressed several warnings in ElasticSearchClientServiceImpl
    
    This closes #4667.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi-elasticsearch-client-service/pom.xml      |  16 +--
 .../ElasticSearchClientServiceImpl.java            | 124 ++++++++++-----------
 .../AbstractElasticsearchHttpProcessor.java        |  66 +----------
 .../elasticsearch/FetchElasticsearchHttp.java      |  22 ++--
 .../elasticsearch/PutElasticsearchHttp.java        |   9 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   9 +-
 .../elasticsearch/QueryElasticsearchHttp.java      |  27 ++---
 .../elasticsearch/ScrollElasticsearchHttp.java     |  26 ++---
 .../elasticsearch/ITQueryElasticsearchHttp.java    |   3 -
 .../elasticsearch/ITScrollElasticsearchHttp.java   |   2 -
 .../PutElasticsearchHttpRecordIT.java              |   6 -
 .../elasticsearch/TestFetchElasticsearchHttp.java  |  87 ++++++++-------
 .../elasticsearch/TestPutElasticsearchHttp.java    |  15 +--
 .../TestPutElasticsearchHttpRecord.java            |  10 +-
 .../elasticsearch/TestQueryElasticsearchHttp.java  |  50 ++-------
 .../elasticsearch/TestScrollElasticsearchHttp.java |  37 ++----
 .../PutElasticsearchRecordTest.groovy              |  37 ++++++
 17 files changed, 225 insertions(+), 321 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 7d05609..baf8f7a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -26,7 +26,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <es.int.version>5.6.15</es.int.version>
+        <es.int.version>5.6.16</es.int.version>
         <script.name>setup-5.script</script.name>
         <type.name>faketype</type.name>
     </properties>
@@ -71,7 +71,7 @@
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
-            <version>2.6</version>
+            <version>2.8.0</version>
         </dependency>
 
         <dependency>
@@ -82,7 +82,7 @@
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
-            <version>3.9</version>
+            <version>3.11</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -146,7 +146,7 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>4.5.10</version>
+            <version>4.5.13</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -176,7 +176,7 @@
         <profile>
             <id>integration-6</id>
             <properties>
-                <es.int.version>6.7.1</es.int.version>
+                <es.int.version>6.8.13</es.int.version>
                 <type.name>_doc</type.name>
                 <script.name>setup-6.script</script.name>
             </properties>
@@ -184,7 +184,7 @@
         <profile>
             <id>integration-7</id>
             <properties>
-                <es.int.version>7.0.0</es.int.version>
+                <es.int.version>7.10.0</es.int.version>
                 <script.name>setup-7.script</script.name>
                 <type.name>_doc</type.name>
             </properties>
@@ -196,7 +196,7 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-failsafe-plugin</artifactId>
-                        <version>3.0.0-M3</version>
+                        <version>3.0.0-M5</version>
                         <configuration>
                             <systemPropertyVariables>
                                 <type_name>${type.name}</type_name>
@@ -206,7 +206,7 @@
                     <plugin>
                         <groupId>com.github.alexcojocaru</groupId>
                         <artifactId>elasticsearch-maven-plugin</artifactId>
-                        <version>6.13</version>
+                        <version>6.19</version>
                         <configuration>
                             <clusterName>testCluster</clusterName>
                             <transportPort>9500</transportPort>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 3a686af..bad70ba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -26,7 +26,6 @@ import java.net.URL;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -61,25 +60,25 @@ import org.elasticsearch.client.RestClientBuilder;
 public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
     private final ObjectMapper mapper = new ObjectMapper();
 
-    static final private List<PropertyDescriptor> properties;
+    private static final List<PropertyDescriptor> properties;
 
     private RestClient client;
 
     private String url;
-    private Charset charset;
+    private Charset responseCharset;
 
     static {
-        List<PropertyDescriptor> _props = new ArrayList();
-        _props.add(ElasticSearchClientService.HTTP_HOSTS);
-        _props.add(ElasticSearchClientService.USERNAME);
-        _props.add(ElasticSearchClientService.PASSWORD);
-        _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
-        _props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
-        _props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
-        _props.add(ElasticSearchClientService.RETRY_TIMEOUT);
-        _props.add(ElasticSearchClientService.CHARSET);
-
-        properties = Collections.unmodifiableList(_props);
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ElasticSearchClientService.HTTP_HOSTS);
+        props.add(ElasticSearchClientService.USERNAME);
+        props.add(ElasticSearchClientService.PASSWORD);
+        props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+        props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+        props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+        props.add(ElasticSearchClientService.RETRY_TIMEOUT);
+        props.add(ElasticSearchClientService.CHARSET);
+
+        properties = Collections.unmodifiableList(props);
     }
 
     @Override
@@ -91,7 +90,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     public void onEnabled(final ConfigurationContext context) throws InitializationException {
         try {
             setupClient(context);
-            charset = Charset.forName(context.getProperty(CHARSET).getValue());
+            responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
         } catch (Exception ex) {
             getLogger().error("Could not initialize ElasticSearch client.", ex);
             throw new InitializationException(ex);
@@ -126,44 +125,42 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final SSLContext sslContext;
         try {
             sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
-                ? sslService.createSSLContext(ClientAuth.NONE) : null;
+                    ? sslService.createSSLContext(ClientAuth.NONE) : null;
         } catch (Exception e) {
             getLogger().error("Error building up SSL Context from the supplied configuration.", e);
             throw new InitializationException(e);
         }
 
         RestClientBuilder builder = RestClient.builder(hh)
-            .setHttpClientConfigCallback(httpClientBuilder -> {
-                if (sslContext != null) {
-                    httpClientBuilder = httpClientBuilder.setSSLContext(sslContext);
-                }
+                .setHttpClientConfigCallback(httpClientBuilder -> {
+                    if (sslContext != null) {
+                        httpClientBuilder.setSSLContext(sslContext);
+                    }
 
-                if (username != null && password != null) {
-                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-                    credentialsProvider.setCredentials(AuthScope.ANY,
-                            new UsernamePasswordCredentials(username, password));
-                    httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                }
+                    if (username != null && password != null) {
+                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                        credentialsProvider.setCredentials(AuthScope.ANY,
+                                new UsernamePasswordCredentials(username, password));
+                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    }
 
-                return httpClientBuilder;
-            })
-            .setRequestConfigCallback(requestConfigBuilder -> {
-                requestConfigBuilder.setConnectTimeout(connectTimeout);
-                requestConfigBuilder.setSocketTimeout(readTimeout);
-                return requestConfigBuilder;
-            })
-            .setMaxRetryTimeoutMillis(retryTimeout);
+                    return httpClientBuilder;
+                })
+                .setRequestConfigCallback(requestConfigBuilder -> {
+                    requestConfigBuilder.setConnectTimeout(connectTimeout);
+                    requestConfigBuilder.setSocketTimeout(readTimeout);
+                    return requestConfigBuilder;
+                })
+                .setMaxRetryTimeoutMillis(retryTimeout);
 
         this.client = builder.build();
     }
 
     private Response runQuery(String endpoint, String query, String index, String type) {
         StringBuilder sb = new StringBuilder()
-            .append("/")
-            .append(index);
+            .append("/").append(index);
         if (type != null && !type.equals("")) {
-            sb.append("/")
-            .append(type);
+            sb.append("/").append(type);
         }
 
         sb.append(String.format("/%s", endpoint));
@@ -181,11 +178,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final int code = response.getStatusLine().getStatusCode();
 
         try {
-            if (code >= 200 & code < 300) {
+            if (code >= 200 && code < 300) {
                 InputStream inputStream = response.getEntity().getContent();
                 byte[] result = IOUtils.toByteArray(inputStream);
                 inputStream.close();
-                return mapper.readValue(new String(result, charset), Map.class);
+                return (Map<String, Object>) mapper.readValue(new String(result, responseCharset), Map.class);
             } else {
                 String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
                         response.getStatusLine().getReasonPhrase());
@@ -198,7 +195,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     @Override
     public IndexOperationResponse add(IndexOperationRequest operation) {
-        return bulk(Arrays.asList(operation));
+        return bulk(Collections.singletonList(operation));
     }
 
     private String flatten(String str) {
@@ -216,8 +213,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         Map<String, Object> header = new HashMap<String, Object>() {{
             put(operation, new HashMap<String, Object>() {{
                 put("_index", index);
-                put("_id", id);
-                put("_type", type);
+                if (StringUtils.isNotBlank(id)) {
+                    put("_id", id);
+                }
+                if (StringUtils.isNotBlank(type)) {
+                    put("_type", type);
+                }
             }});
         }};
 
@@ -256,8 +257,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
         try {
             StringBuilder payload = new StringBuilder();
-            for (int index = 0; index < operations.size(); index++) {
-                IndexOperationRequest or = operations.get(index);
+            for (final IndexOperationRequest or : operations) {
                 buildRequest(or, payload);
             }
 
@@ -276,9 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                 getLogger().debug(String.format("Response was: %s", rawResponse));
             }
 
-            IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
-
-            return retVal;
+            return IndexOperationResponse.fromJsonResponse(rawResponse);
         } catch (Exception ex) {
             throw new ElasticsearchError(ex);
         }
@@ -294,15 +292,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     @Override
     public DeleteOperationResponse deleteById(String index, String type, String id) {
-        return deleteById(index, type, Arrays.asList(id));
+        return deleteById(index, type, Collections.singletonList(id));
     }
 
     @Override
     public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
         try {
             StringBuilder sb = new StringBuilder();
-            for (int idx = 0; idx < ids.size(); idx++) {
-                String header = buildBulkHeader("delete", index, type, ids.get(idx));
+            for (final String id : ids) {
+                String header = buildBulkHeader("delete", index, type, id);
                 sb.append(header).append("\n");
             }
             HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
@@ -316,9 +314,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                         IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)));
             }
 
-            DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
-
-            return dor;
+            return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
@@ -329,7 +325,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         long start = System.currentTimeMillis();
         Response response = runQuery("_delete_by_query", query, index, type);
         long end   = System.currentTimeMillis();
-        Map<String, Object> parsed = parseResponse(response);
+
+        // check for errors in response
+        parseResponse(response);
 
         return new DeleteOperationResponse(end - start);
     }
@@ -359,9 +357,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
      */
     private int handleSearchCount(Object raw) {
         if (raw instanceof Number) {
-            return Integer.valueOf(raw.toString());
+            return Integer.parseInt(raw.toString());
         } else if (raw instanceof Map) {
-            return (Integer)((Map)raw).get("value");
+            return (Integer)((Map<String, Object>)raw).get("value");
         } else {
             throw new ProcessException("Unknown type for hit count.");
         }
@@ -401,11 +399,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     @Override
     public String getTransitUrl(String index, String type) {
         return new StringBuilder()
-            .append(this.url)
-            .append(index != null && !index.equals("") ? "/" : "")
-            .append(index != null ? index : "")
-            .append(type != null && !type.equals("") ? "/" : "")
-            .append(type != null ? type : "")
-            .toString();
+                .append(this.url)
+                .append(StringUtils.isNotBlank(index) ? "/" : "")
+                .append(StringUtils.isNotBlank(index) ? index : "")
+                .append(StringUtils.isNotBlank(type) ? "/" : "")
+                .append(StringUtils.isNotBlank(type) ? type : "")
+                .toString();
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 291d99b..0374eb7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,11 +36,9 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.Route;
 import org.apache.commons.text.StringEscapeUtils;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -57,13 +55,7 @@ import org.apache.nifi.util.StringUtils;
  * A base class for Elasticsearch processors that use the HTTP API
  */
 public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
-    enum ElasticsearchVersion {
-        ES_7,
-        ES_LESS_THAN_7
-    }
-
-    static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
-    static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
+    static final String SOURCE_QUERY_PARAM = "_source";
     static final String QUERY_QUERY_PARAM = "q";
     static final String SORT_QUERY_PARAM = "sort";
     static final String SIZE_QUERY_PARAM = "size";
@@ -134,18 +126,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
-            .name("elasticsearch-http-version")
-            .displayName("Elasticsearch Version")
-            .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
-            .required(true)
-            .allowableValues(
-                    new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
-                    new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
-            .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
 
     @Override
@@ -167,7 +147,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ES_URL);
-        properties.add(ES_VERSION);
         properties.add(PROP_SSL_CONTEXT_SERVICE);
         properties.add(CHARSET);
         properties.add(USERNAME);
@@ -308,12 +287,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
             }
-            if (!StringUtils.isEmpty(id)) {
+            if (StringUtils.isNotBlank(id)) {
                 sb.append(", \"_id\": \"");
                 sb.append(StringEscapeUtils.escapeJson(id));
                 sb.append("\"");
@@ -325,7 +304,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("{\"update\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
@@ -342,47 +321,14 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("{\"delete\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
             }
             sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }}\n");
-        }
-    }
-
-    protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
-        return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
-                ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
-    }
-
-    static class ElasticsearchTypeValidator implements Validator {
-        private final boolean pre7TypeRequired;
-
-        /**
-         * Creates a validator for an ES type
-         * @param pre7TypeRequired If true, 'type' will be required for ES
-         * before version 7.0.
-         */
-        public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
-            this.pre7TypeRequired = pre7TypeRequired;
-        }
-
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
-                    .getProperty(ES_VERSION).getValue());
-            if (esVersion == ElasticsearchVersion.ES_7) {
-                return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
-                        .explanation("Elasticsearch no longer supports 'type' as of version 7.0.  Please use '_doc' or leave blank.")
-                        .build();
-            } else {
-                return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
-                        .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
-                        .build();
-            }
+            sb.append("\" } }\n");
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index 50cad68..dea039c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,7 +40,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -121,11 +120,12 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("fetch-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document/fetch (if unset, the first document matching the "
+                    + "identifier across _all types will be retrieved). "
+                    + "This should be unset, '_doc' or '_source' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -150,7 +150,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(DOC_ID);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -201,8 +200,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String fields = context.getProperty(FIELDS).isSet()
                 ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
                 : null;
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -218,7 +215,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
             // read the url property from the context
             final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
+            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
             final long startNanos = System.nanoTime();
 
             getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -310,18 +307,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         }
     }
 
-    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
         HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
         builder.addPathSegment(index);
-        builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
+        builder.addPathSegment(StringUtils.isBlank(type) ? "_all" : type);
         builder.addPathSegment(docId);
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+            builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
         }
 
         // Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index b427b8e..8c501c3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,11 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-type")
             .displayName("Type")
-            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+                    + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(true))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -153,7 +153,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(ID_ATTRIBUTE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 130bde8..fd28b49 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,11 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-record-type")
             .displayName("Type")
-            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+                    + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(true))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -261,7 +261,6 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 58bab07..0a769d6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -145,7 +145,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
             .name("query-es-index")
             .displayName("Index")
-            .description("The name of the index to read from. If the property is set "
+            .description("The name of the index to read from. If the property is unset or set "
                             + "to _all, the query will match across all indexes.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -155,11 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("query-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, searches across all types).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document (if unset, the query will be against all types in the _index). "
+                    + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -235,7 +235,6 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     static {
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(QUERY);
-        descriptors.add(ES_VERSION);
         descriptors.add(PAGE_SIZE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -307,17 +306,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
                 .getValue();
         final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
-                .asInteger().intValue();
+                .asInteger();
         final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
-                .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
+                .evaluateAttributeExpressions(flowFile).asInteger() : null;
         final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final boolean targetIsContent = context.getProperty(TARGET).getValue()
                 .equals(TARGET_FLOW_FILE_CONTENT);
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
+
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -345,7 +343,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 }
 
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        mPageSize, fromIndex, context, esVersion);
+                        mPageSize, fromIndex, context);
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
@@ -506,13 +504,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+            String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
         HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
         builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
-        if (!StringUtils.isEmpty(type)) {
+        if (StringUtils.isNotBlank(type)) {
             builder.addPathSegment(type);
         }
         builder.addPathSegment("_search");
@@ -521,8 +519,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+            builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
         }
         if (!StringUtils.isEmpty(sort)) {
             String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 322f82f..33c75d7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,8 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -137,11 +135,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("scroll-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, searches across all types).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document (if unset, the query will be against all types in the _index). "
+                    + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -186,7 +184,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(QUERY);
         descriptors.add(SCROLL_DURATION);
         descriptors.add(PAGE_SIZE);
@@ -239,15 +236,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
                 .getValue();
         final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
-                .asInteger().intValue();
+                .asInteger();
         final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
                 .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -264,7 +259,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                     .getValue());
             if (scrollId != null) {
                 final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context, esVersion);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -282,7 +277,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
 
                 // read the url property from the context
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context, esVersion);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -419,7 +414,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+            String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -429,7 +424,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addPathSegment("scroll");
         } else {
             builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
-            if (!StringUtils.isEmpty(type)) {
+            if (StringUtils.isNotBlank(type)) {
                 builder.addPathSegment(type);
             }
             builder.addPathSegment("_search");
@@ -437,8 +432,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
             if (!StringUtils.isEmpty(fields)) {
                 String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-                final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-                builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+                builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
             }
             if (!StringUtils.isEmpty(sort)) {
                 String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index e3ace24..f60e2f9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -43,7 +42,6 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
@@ -70,7 +68,6 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index b25e3e2..57ee242 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -41,7 +40,6 @@ public class ITScrollElasticsearchHttp {
         runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://ip-172-31-49-152.ec2.internal:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index c13bf40..9a4a63f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -63,7 +62,6 @@ public class PutElasticsearchHttpRecordIT {
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         FETCH_RUNNER.assertValid();
     }
 
@@ -77,7 +75,6 @@ public class PutElasticsearchHttpRecordIT {
         runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
         runner.addControllerService("reader", recordReader);
         runner.enableControllerService(recordReader);
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
         runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -213,7 +210,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -237,7 +233,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -261,7 +256,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index fb8b164..babdb3b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -26,11 +26,12 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -75,17 +76,16 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.assertValid();
-        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(FetchElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
         runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertValid(); // Valid because type can be _doc for 7.0+
         runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
         runner.assertValid();
 
@@ -135,7 +135,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
@@ -184,7 +184,7 @@ public class TestFetchElasticsearchHttp {
         runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         runner.setIncomingConnection(true);
@@ -284,7 +284,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -300,7 +300,7 @@ public class TestFetchElasticsearchHttp {
     @Test
     public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException {
         FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
-        p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue");
+        p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source=id&myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
@@ -323,6 +323,32 @@ public class TestFetchElasticsearchHttp {
         out.assertAttributeEquals("doc_id", "28039652140");
     }
 
+    @Test
+    public void testFetchElasticsearchOnTriggerQueryParameterNoType() throws IOException {
+        FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
+        p.setExpectedUrl("http://127.0.0.1:9200/doc/_all/28039652140?_source=id&myparam=myvalue");
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
+
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("myparam", "myvalue");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -412,15 +438,25 @@ public class TestFetchElasticsearchHttp {
      */
     @Test
     @Ignore("Comment this out if you want to run against local or test ES")
-    public void testFetchElasticsearchBasic() {
+    public void testFetchElasticsearchBasic() throws IOException {
         System.out.println("Starting test " + new Object() {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
 
+        // add data to ES instance
+        new OkHttpClient.Builder().build().newCall(
+                new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140")
+                        .addHeader("Content-Type", "application/json")
+                        .put(
+                                RequestBody.create(MediaType.get("application/json"),
+                                        IOUtils.toString(docExample, StandardCharsets.UTF_8))
+                        ).build()
+        ).execute();
+
         //Local Cluster - Mac pulled from brew
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
 
@@ -434,31 +470,6 @@ public class TestFetchElasticsearchHttp {
     }
 
     @Test
-    @Ignore("Comment this out if you want to run against local or test ES")
-    public void testFetchElasticsearchBatch() throws IOException {
-        System.out.println("Starting test " + new Object() {
-        }.getClass().getEnclosingMethod().getName());
-        final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
-
-        //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
-        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        runner.assertValid();
-
-        for (int i = 0; i < 100; i++) {
-            long newId = 28039652140L + i;
-            final String newStrId = Long.toString(newId);
-            runner.enqueue(docExample, new HashMap<String, String>() {{
-                put("doc_id", newStrId);
-            }});
-        }
-        runner.run(100);
-        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100);
-    }
-
-    @Test
     @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
     public void testFetchElasticsearchBasicBehindProxy() {
         final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 896b981..a9371d1 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -112,10 +111,9 @@ public class TestPutElasticsearchHttp {
     public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttp.TYPE);
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
 
@@ -340,17 +338,16 @@ public class TestPutElasticsearchHttp {
 
         runner.assertNotValid();
         runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttp.TYPE, "");
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertNotValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, " ");
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertValid();
         runner.removeProperty(PutElasticsearchHttp.TYPE);
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(PutElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index a651aef..bea9995 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,7 +28,6 @@ import okio.Buffer;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -197,10 +196,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
         runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
@@ -248,10 +246,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
@@ -291,10 +288,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index 0f6b6e4..4cdb035 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,7 +32,6 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -65,7 +64,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -85,7 +83,6 @@ public class TestQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setValidateExpressionUsage(true);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -109,20 +106,15 @@ public class TestQueryElasticsearchHttp {
         runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.QUERY,
                 "source:Twitter AND identifier:\"${identifier}\"");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
         runner.assertValid();
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
-        runner.assertValid();
-        runner.removeProperty(QueryElasticsearchHttp.TYPE);
-        runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
         runner.assertValid();
         runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
         runner.assertValid();
@@ -138,7 +130,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -165,7 +156,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -211,7 +201,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -231,7 +220,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -254,7 +242,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(500, "Server error");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -280,7 +267,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -306,7 +292,6 @@ public class TestQueryElasticsearchHttp {
         processor.setExceptionToThrow(new IOException("Error reading from disk"));
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -332,7 +317,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 2);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -359,7 +343,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 1);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -382,9 +365,8 @@ public class TestQueryElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -421,7 +403,6 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -449,7 +430,6 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -465,7 +445,6 @@ public class TestQueryElasticsearchHttp {
         p.setExpectedParam("myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -478,30 +457,15 @@ public class TestQueryElasticsearchHttp {
     @Test
     public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
         QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        p.setExpectedParam("_source=test");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
         runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
         runAndVerifySuccess(true);
-
-        // Now test with ES 7.x
-
-        p = new QueryElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
-        runner = TestRunners.newTestRunner(p);
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
-        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
-        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
-        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
-        runAndVerifySuccess(true);
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index 74ac031..f790256 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -83,30 +82,15 @@ public class TestScrollElasticsearchHttp {
     @Test
     public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
         ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        p.setExpectedParam("_source=test");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
         runAndVerifySuccess();
-
-        // Now test with ES 7.x
-
-        p = new ScrollElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
-        runner = TestRunners.newTestRunner(p);
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
-        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
-        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
-        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
-        runAndVerifySuccess();
     }
 
     @Test
@@ -163,20 +147,18 @@ public class TestScrollElasticsearchHttp {
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
-        runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertValid();
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
+        runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "${type}");
         runner.assertValid();
-        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
-        runner.assertNotValid();
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
         runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -286,9 +268,8 @@ public class TestScrollElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
         runner.setIncomingConnection(false);
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 214f15e..1eb2893 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
 import org.apache.nifi.serialization.RecordReaderFactory
 import org.apache.nifi.serialization.record.MockRecordParser
 import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.StringUtils
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
 import org.junit.Assert
@@ -206,6 +207,42 @@ class PutElasticsearchRecordTest {
             "schema.name": "recordPathTest"
         ])
         runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        flowFileContents = prettyPrint(toJson([
+                [ msg: "Hello" ],
+                [ id: null, type: null, msg: "Hello" ],
+                [ id: "rec-3", msg: "Hello" ],
+                [ id: "rec-4", msg: "Hello" ],
+                [ id: "rec-5", msg: "Hello" ],
+                [ id: "rec-6", type: "message", msg: "Hello" ]
+        ]))
+
+        evalClosure = { List<IndexOperationRequest> items ->
+            def nullTypeCount = items.findAll { it.type == null }.size()
+            def messageTypeCount = items.findAll { it.type == "message" }.size()
+            def nullIdCount = items.findAll { it.id == null }.size()
+            def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
+            Assert.assertEquals("null type", 5, nullTypeCount)
+            Assert.assertEquals("message type", 1, messageTypeCount)
+            Assert.assertEquals("null id", 2, nullIdCount)
+            Assert.assertEquals("rec- id", 4, recIdCount)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        runner.removeProperty(PutElasticsearchRecord.TYPE)
+        runner.enqueue(flowFileContents, [
+                "schema.name": "recordPathTest"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
 
         runner.clearTransferState()