You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/11/26 06:09:45 UTC

[nifi] 02/02: NIFI-6403 and NIFI-6404: Elasticsearch 7 support

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 124cdbd3fe5446bff08346e5a28f2883f13e2848
Author: Chris Sampson <ch...@digital.homeoffice.gov.uk>
AuthorDate: Mon Nov 16 13:03:50 2020 +0000

    NIFI-6403 and NIFI-6404: Elasticsearch 7 support
    
    Addressed PR#4153 comments; removed ES Version property and made Type optional in all ES HTTP/Record processors, applying sensible default values where required; use _source queyr parameter instead of _source_include/s as it's compatible between ES versions
    
    Fix unit test compilation to use JDK8-compatible library/method
    
    Better optional type and id handling for PutElasticsearchRecord; update nifi-elasticsearch-client-service build dependencies to use latest versions of Elasticsearch in each supported major version (5/6/7); addressed several warnings in ElasticSearchClientServiceImpl
    
    This closes #4667.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi-elasticsearch-client-service/pom.xml      |  16 +--
 .../ElasticSearchClientServiceImpl.java            | 124 ++++++++++-----------
 .../AbstractElasticsearchHttpProcessor.java        |  66 +----------
 .../elasticsearch/FetchElasticsearchHttp.java      |  22 ++--
 .../elasticsearch/PutElasticsearchHttp.java        |   9 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   9 +-
 .../elasticsearch/QueryElasticsearchHttp.java      |  27 ++---
 .../elasticsearch/ScrollElasticsearchHttp.java     |  26 ++---
 .../elasticsearch/ITQueryElasticsearchHttp.java    |   3 -
 .../elasticsearch/ITScrollElasticsearchHttp.java   |   2 -
 .../PutElasticsearchHttpRecordIT.java              |   6 -
 .../elasticsearch/TestFetchElasticsearchHttp.java  |  87 ++++++++-------
 .../elasticsearch/TestPutElasticsearchHttp.java    |  15 +--
 .../TestPutElasticsearchHttpRecord.java            |  10 +-
 .../elasticsearch/TestQueryElasticsearchHttp.java  |  50 ++-------
 .../elasticsearch/TestScrollElasticsearchHttp.java |  37 ++----
 .../PutElasticsearchRecordTest.groovy              |  37 ++++++
 17 files changed, 225 insertions(+), 321 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 7d05609..baf8f7a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -26,7 +26,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <es.int.version>5.6.15</es.int.version>
+        <es.int.version>5.6.16</es.int.version>
         <script.name>setup-5.script</script.name>
         <type.name>faketype</type.name>
     </properties>
@@ -71,7 +71,7 @@
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
-            <version>2.6</version>
+            <version>2.8.0</version>
         </dependency>
 
         <dependency>
@@ -82,7 +82,7 @@
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
-            <version>3.9</version>
+            <version>3.11</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -146,7 +146,7 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>4.5.10</version>
+            <version>4.5.13</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -176,7 +176,7 @@
         <profile>
             <id>integration-6</id>
             <properties>
-                <es.int.version>6.7.1</es.int.version>
+                <es.int.version>6.8.13</es.int.version>
                 <type.name>_doc</type.name>
                 <script.name>setup-6.script</script.name>
             </properties>
@@ -184,7 +184,7 @@
         <profile>
             <id>integration-7</id>
             <properties>
-                <es.int.version>7.0.0</es.int.version>
+                <es.int.version>7.10.0</es.int.version>
                 <script.name>setup-7.script</script.name>
                 <type.name>_doc</type.name>
             </properties>
@@ -196,7 +196,7 @@
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-failsafe-plugin</artifactId>
-                        <version>3.0.0-M3</version>
+                        <version>3.0.0-M5</version>
                         <configuration>
                             <systemPropertyVariables>
                                 <type_name>${type.name}</type_name>
@@ -206,7 +206,7 @@
                     <plugin>
                         <groupId>com.github.alexcojocaru</groupId>
                         <artifactId>elasticsearch-maven-plugin</artifactId>
-                        <version>6.13</version>
+                        <version>6.19</version>
                         <configuration>
                             <clusterName>testCluster</clusterName>
                             <transportPort>9500</transportPort>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 3a686af..bad70ba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -26,7 +26,6 @@ import java.net.URL;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -61,25 +60,25 @@ import org.elasticsearch.client.RestClientBuilder;
 public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
     private final ObjectMapper mapper = new ObjectMapper();
 
-    static final private List<PropertyDescriptor> properties;
+    private static final List<PropertyDescriptor> properties;
 
     private RestClient client;
 
     private String url;
-    private Charset charset;
+    private Charset responseCharset;
 
     static {
-        List<PropertyDescriptor> _props = new ArrayList();
-        _props.add(ElasticSearchClientService.HTTP_HOSTS);
-        _props.add(ElasticSearchClientService.USERNAME);
-        _props.add(ElasticSearchClientService.PASSWORD);
-        _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
-        _props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
-        _props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
-        _props.add(ElasticSearchClientService.RETRY_TIMEOUT);
-        _props.add(ElasticSearchClientService.CHARSET);
-
-        properties = Collections.unmodifiableList(_props);
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ElasticSearchClientService.HTTP_HOSTS);
+        props.add(ElasticSearchClientService.USERNAME);
+        props.add(ElasticSearchClientService.PASSWORD);
+        props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+        props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+        props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+        props.add(ElasticSearchClientService.RETRY_TIMEOUT);
+        props.add(ElasticSearchClientService.CHARSET);
+
+        properties = Collections.unmodifiableList(props);
     }
 
     @Override
@@ -91,7 +90,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     public void onEnabled(final ConfigurationContext context) throws InitializationException {
         try {
             setupClient(context);
-            charset = Charset.forName(context.getProperty(CHARSET).getValue());
+            responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
         } catch (Exception ex) {
             getLogger().error("Could not initialize ElasticSearch client.", ex);
             throw new InitializationException(ex);
@@ -126,44 +125,42 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final SSLContext sslContext;
         try {
             sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
-                ? sslService.createSSLContext(ClientAuth.NONE) : null;
+                    ? sslService.createSSLContext(ClientAuth.NONE) : null;
         } catch (Exception e) {
             getLogger().error("Error building up SSL Context from the supplied configuration.", e);
             throw new InitializationException(e);
         }
 
         RestClientBuilder builder = RestClient.builder(hh)
-            .setHttpClientConfigCallback(httpClientBuilder -> {
-                if (sslContext != null) {
-                    httpClientBuilder = httpClientBuilder.setSSLContext(sslContext);
-                }
+                .setHttpClientConfigCallback(httpClientBuilder -> {
+                    if (sslContext != null) {
+                        httpClientBuilder.setSSLContext(sslContext);
+                    }
 
-                if (username != null && password != null) {
-                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-                    credentialsProvider.setCredentials(AuthScope.ANY,
-                            new UsernamePasswordCredentials(username, password));
-                    httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                }
+                    if (username != null && password != null) {
+                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                        credentialsProvider.setCredentials(AuthScope.ANY,
+                                new UsernamePasswordCredentials(username, password));
+                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    }
 
-                return httpClientBuilder;
-            })
-            .setRequestConfigCallback(requestConfigBuilder -> {
-                requestConfigBuilder.setConnectTimeout(connectTimeout);
-                requestConfigBuilder.setSocketTimeout(readTimeout);
-                return requestConfigBuilder;
-            })
-            .setMaxRetryTimeoutMillis(retryTimeout);
+                    return httpClientBuilder;
+                })
+                .setRequestConfigCallback(requestConfigBuilder -> {
+                    requestConfigBuilder.setConnectTimeout(connectTimeout);
+                    requestConfigBuilder.setSocketTimeout(readTimeout);
+                    return requestConfigBuilder;
+                })
+                .setMaxRetryTimeoutMillis(retryTimeout);
 
         this.client = builder.build();
     }
 
     private Response runQuery(String endpoint, String query, String index, String type) {
         StringBuilder sb = new StringBuilder()
-            .append("/")
-            .append(index);
+            .append("/").append(index);
         if (type != null && !type.equals("")) {
-            sb.append("/")
-            .append(type);
+            sb.append("/").append(type);
         }
 
         sb.append(String.format("/%s", endpoint));
@@ -181,11 +178,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final int code = response.getStatusLine().getStatusCode();
 
         try {
-            if (code >= 200 & code < 300) {
+            if (code >= 200 && code < 300) {
                 InputStream inputStream = response.getEntity().getContent();
                 byte[] result = IOUtils.toByteArray(inputStream);
                 inputStream.close();
-                return mapper.readValue(new String(result, charset), Map.class);
+                return (Map<String, Object>) mapper.readValue(new String(result, responseCharset), Map.class);
             } else {
                 String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
                         response.getStatusLine().getReasonPhrase());
@@ -198,7 +195,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     @Override
     public IndexOperationResponse add(IndexOperationRequest operation) {
-        return bulk(Arrays.asList(operation));
+        return bulk(Collections.singletonList(operation));
     }
 
     private String flatten(String str) {
@@ -216,8 +213,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         Map<String, Object> header = new HashMap<String, Object>() {{
             put(operation, new HashMap<String, Object>() {{
                 put("_index", index);
-                put("_id", id);
-                put("_type", type);
+                if (StringUtils.isNotBlank(id)) {
+                    put("_id", id);
+                }
+                if (StringUtils.isNotBlank(type)) {
+                    put("_type", type);
+                }
             }});
         }};
 
@@ -256,8 +257,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
         try {
             StringBuilder payload = new StringBuilder();
-            for (int index = 0; index < operations.size(); index++) {
-                IndexOperationRequest or = operations.get(index);
+            for (final IndexOperationRequest or : operations) {
                 buildRequest(or, payload);
             }
 
@@ -276,9 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                 getLogger().debug(String.format("Response was: %s", rawResponse));
             }
 
-            IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
-
-            return retVal;
+            return IndexOperationResponse.fromJsonResponse(rawResponse);
         } catch (Exception ex) {
             throw new ElasticsearchError(ex);
         }
@@ -294,15 +292,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     @Override
     public DeleteOperationResponse deleteById(String index, String type, String id) {
-        return deleteById(index, type, Arrays.asList(id));
+        return deleteById(index, type, Collections.singletonList(id));
     }
 
     @Override
     public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
         try {
             StringBuilder sb = new StringBuilder();
-            for (int idx = 0; idx < ids.size(); idx++) {
-                String header = buildBulkHeader("delete", index, type, ids.get(idx));
+            for (final String id : ids) {
+                String header = buildBulkHeader("delete", index, type, id);
                 sb.append(header).append("\n");
             }
             HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
@@ -316,9 +314,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                         IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)));
             }
 
-            DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
-
-            return dor;
+            return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
@@ -329,7 +325,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         long start = System.currentTimeMillis();
         Response response = runQuery("_delete_by_query", query, index, type);
         long end   = System.currentTimeMillis();
-        Map<String, Object> parsed = parseResponse(response);
+
+        // check for errors in response
+        parseResponse(response);
 
         return new DeleteOperationResponse(end - start);
     }
@@ -359,9 +357,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
      */
     private int handleSearchCount(Object raw) {
         if (raw instanceof Number) {
-            return Integer.valueOf(raw.toString());
+            return Integer.parseInt(raw.toString());
         } else if (raw instanceof Map) {
-            return (Integer)((Map)raw).get("value");
+            return (Integer)((Map<String, Object>)raw).get("value");
         } else {
             throw new ProcessException("Unknown type for hit count.");
         }
@@ -401,11 +399,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     @Override
     public String getTransitUrl(String index, String type) {
         return new StringBuilder()
-            .append(this.url)
-            .append(index != null && !index.equals("") ? "/" : "")
-            .append(index != null ? index : "")
-            .append(type != null && !type.equals("") ? "/" : "")
-            .append(type != null ? type : "")
-            .toString();
+                .append(this.url)
+                .append(StringUtils.isNotBlank(index) ? "/" : "")
+                .append(StringUtils.isNotBlank(index) ? index : "")
+                .append(StringUtils.isNotBlank(type) ? "/" : "")
+                .append(StringUtils.isNotBlank(type) ? type : "")
+                .toString();
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 291d99b..0374eb7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,11 +36,9 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.Route;
 import org.apache.commons.text.StringEscapeUtils;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -57,13 +55,7 @@ import org.apache.nifi.util.StringUtils;
  * A base class for Elasticsearch processors that use the HTTP API
  */
 public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
-    enum ElasticsearchVersion {
-        ES_7,
-        ES_LESS_THAN_7
-    }
-
-    static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
-    static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
+    static final String SOURCE_QUERY_PARAM = "_source";
     static final String QUERY_QUERY_PARAM = "q";
     static final String SORT_QUERY_PARAM = "sort";
     static final String SIZE_QUERY_PARAM = "size";
@@ -134,18 +126,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
-            .name("elasticsearch-http-version")
-            .displayName("Elasticsearch Version")
-            .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
-            .required(true)
-            .allowableValues(
-                    new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
-                    new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
-            .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
 
     @Override
@@ -167,7 +147,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ES_URL);
-        properties.add(ES_VERSION);
         properties.add(PROP_SSL_CONTEXT_SERVICE);
         properties.add(CHARSET);
         properties.add(USERNAME);
@@ -308,12 +287,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
             }
-            if (!StringUtils.isEmpty(id)) {
+            if (StringUtils.isNotBlank(id)) {
                 sb.append(", \"_id\": \"");
                 sb.append(StringEscapeUtils.escapeJson(id));
                 sb.append("\"");
@@ -325,7 +304,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("{\"update\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
@@ -342,47 +321,14 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             sb.append("{\"delete\": { \"_index\": \"");
             sb.append(StringEscapeUtils.escapeJson(index));
             sb.append("\"");
-            if (!(StringUtils.isEmpty(docType) | docType == null)){
+            if (StringUtils.isNotBlank(docType)) {
                 sb.append(", \"_type\": \"");
                 sb.append(StringEscapeUtils.escapeJson(docType));
                 sb.append("\"");
             }
             sb.append(", \"_id\": \"");
             sb.append(StringEscapeUtils.escapeJson(id));
-            sb.append("\" }}\n");
-        }
-    }
-
-    protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
-        return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
-                ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
-    }
-
-    static class ElasticsearchTypeValidator implements Validator {
-        private final boolean pre7TypeRequired;
-
-        /**
-         * Creates a validator for an ES type
-         * @param pre7TypeRequired If true, 'type' will be required for ES
-         * before version 7.0.
-         */
-        public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
-            this.pre7TypeRequired = pre7TypeRequired;
-        }
-
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
-                    .getProperty(ES_VERSION).getValue());
-            if (esVersion == ElasticsearchVersion.ES_7) {
-                return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
-                        .explanation("Elasticsearch no longer supports 'type' as of version 7.0.  Please use '_doc' or leave blank.")
-                        .build();
-            } else {
-                return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
-                        .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
-                        .build();
-            }
+            sb.append("\" } }\n");
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index 50cad68..dea039c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,7 +40,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -121,11 +120,12 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("fetch-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document/fetch (if unset, the first document matching the "
+                    + "identifier across _all types will be retrieved). "
+                    + "This should be unset, '_doc' or '_source' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -150,7 +150,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(DOC_ID);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -201,8 +200,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String fields = context.getProperty(FIELDS).isSet()
                 ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
                 : null;
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -218,7 +215,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
             // read the url property from the context
             final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
-            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
+            final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
             final long startNanos = System.nanoTime();
 
             getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -310,18 +307,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         }
     }
 
-    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
         HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
         builder.addPathSegment(index);
-        builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
+        builder.addPathSegment(StringUtils.isBlank(type) ? "_all" : type);
         builder.addPathSegment(docId);
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+            builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
         }
 
         // Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index b427b8e..8c501c3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,11 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-type")
             .displayName("Type")
-            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+                    + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(true))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -153,7 +153,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(ID_ATTRIBUTE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 130bde8..fd28b49 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,11 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
     static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("put-es-record-type")
             .displayName("Type")
-            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+                    + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(true))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -261,7 +261,6 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 58bab07..0a769d6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -145,7 +145,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
             .name("query-es-index")
             .displayName("Index")
-            .description("The name of the index to read from. If the property is set "
+            .description("The name of the index to read from. If the property is unset or set "
                             + "to _all, the query will match across all indexes.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -155,11 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("query-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, searches across all types).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document (if unset, the query will be against all types in the _index). "
+                    + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -235,7 +235,6 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     static {
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(QUERY);
-        descriptors.add(ES_VERSION);
         descriptors.add(PAGE_SIZE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -307,17 +306,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
                 .getValue();
         final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
-                .asInteger().intValue();
+                .asInteger();
         final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
-                .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
+                .evaluateAttributeExpressions(flowFile).asInteger() : null;
         final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final boolean targetIsContent = context.getProperty(TARGET).getValue()
                 .equals(TARGET_FLOW_FILE_CONTENT);
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
+
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -345,7 +343,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 }
 
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        mPageSize, fromIndex, context, esVersion);
+                        mPageSize, fromIndex, context);
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
@@ -506,13 +504,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+            String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
         HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
         builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
-        if (!StringUtils.isEmpty(type)) {
+        if (StringUtils.isNotBlank(type)) {
             builder.addPathSegment(type);
         }
         builder.addPathSegment("_search");
@@ -521,8 +519,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
         if (!StringUtils.isEmpty(fields)) {
             String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-            final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-            builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+            builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
         }
         if (!StringUtils.isEmpty(sort)) {
             String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 322f82f..33c75d7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,8 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -137,11 +135,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("scroll-es-type")
             .displayName("Type")
-            .description("The type of this document (if empty, searches across all types).  "
-                    + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
-            .required(true)
+            .description("The type of document (if unset, the query will be against all types in the _index). "
+                    + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new ElasticsearchTypeValidator(false))
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -186,7 +184,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         relationships = Collections.unmodifiableSet(_rels);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
-        descriptors.add(ES_VERSION);
         descriptors.add(QUERY);
         descriptors.add(SCROLL_DURATION);
         descriptors.add(PAGE_SIZE);
@@ -239,15 +236,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
                 .getValue();
         final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
-                .asInteger().intValue();
+                .asInteger();
         final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
                 .evaluateAttributeExpressions(flowFile).getValue() : null;
         final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
                 .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
-        final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
-                .getValue());
 
         // Authentication
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -264,7 +259,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                     .getValue());
             if (scrollId != null) {
                 final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context, esVersion);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -282,7 +277,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
 
                 // read the url property from the context
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
-                        scrollId, pageSize, scroll, context, esVersion);
+                        scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -419,7 +414,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
     }
 
     private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
-            String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+            String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
         if (StringUtils.isEmpty(baseUrl)) {
             throw new MalformedURLException("Base URL cannot be null");
         }
@@ -429,7 +424,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addPathSegment("scroll");
         } else {
             builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
-            if (!StringUtils.isEmpty(type)) {
+            if (StringUtils.isNotBlank(type)) {
                 builder.addPathSegment(type);
             }
             builder.addPathSegment("_search");
@@ -437,8 +432,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
             if (!StringUtils.isEmpty(fields)) {
                 String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
-                final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
-                builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+                builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
             }
             if (!StringUtils.isEmpty(sort)) {
                 String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index e3ace24..f60e2f9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -43,7 +42,6 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
@@ -70,7 +68,6 @@ public class ITQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://localhost.internal:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index b25e3e2..57ee242 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -41,7 +40,6 @@ public class ITScrollElasticsearchHttp {
         runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
                 "http://ip-172-31-49-152.ec2.internal:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
         runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index c13bf40..9a4a63f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -63,7 +62,6 @@ public class PutElasticsearchHttpRecordIT {
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
         FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         FETCH_RUNNER.assertValid();
     }
 
@@ -77,7 +75,6 @@ public class PutElasticsearchHttpRecordIT {
         runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
         runner.addControllerService("reader", recordReader);
         runner.enableControllerService(recordReader);
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
         runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -213,7 +210,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -237,7 +233,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
@@ -261,7 +256,6 @@ public class PutElasticsearchHttpRecordIT {
         // Undo some stuff from setup()
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
-        runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
             put("name", "John Doe");
             put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index fb8b164..babdb3b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -26,11 +26,12 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -75,17 +76,16 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.assertValid();
-        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(FetchElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
         runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertValid(); // Valid because type can be _doc for 7.0+
         runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
         runner.assertValid();
 
@@ -135,7 +135,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
 
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.assertNotValid();
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
@@ -184,7 +184,7 @@ public class TestFetchElasticsearchHttp {
         runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         runner.setIncomingConnection(true);
@@ -284,7 +284,7 @@ public class TestFetchElasticsearchHttp {
         runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -300,7 +300,7 @@ public class TestFetchElasticsearchHttp {
     @Test
     public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException {
         FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
-        p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue");
+        p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source=id&myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
@@ -323,6 +323,32 @@ public class TestFetchElasticsearchHttp {
         out.assertAttributeEquals("doc_id", "28039652140");
     }
 
+    @Test
+    public void testFetchElasticsearchOnTriggerQueryParameterNoType() throws IOException {
+        FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
+        p.setExpectedUrl("http://127.0.0.1:9200/doc/_all/28039652140?_source=id&myparam=myvalue");
+        runner = TestRunners.newTestRunner(p);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
+
+        // Set dynamic property, to be added to the URL as a query parameter
+        runner.setProperty("myparam", "myvalue");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -412,15 +438,25 @@ public class TestFetchElasticsearchHttp {
      */
     @Test
     @Ignore("Comment this out if you want to run against local or test ES")
-    public void testFetchElasticsearchBasic() {
+    public void testFetchElasticsearchBasic() throws IOException {
         System.out.println("Starting test " + new Object() {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
 
+        // add data to ES instance
+        new OkHttpClient.Builder().build().newCall(
+                new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140")
+                        .addHeader("Content-Type", "application/json")
+                        .put(
+                                RequestBody.create(MediaType.get("application/json"),
+                                        IOUtils.toString(docExample, StandardCharsets.UTF_8))
+                        ).build()
+        ).execute();
+
         //Local Cluster - Mac pulled from brew
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
         runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.removeProperty(FetchElasticsearchHttp.TYPE);
         runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
         runner.assertValid();
 
@@ -434,31 +470,6 @@ public class TestFetchElasticsearchHttp {
     }
 
     @Test
-    @Ignore("Comment this out if you want to run against local or test ES")
-    public void testFetchElasticsearchBatch() throws IOException {
-        System.out.println("Starting test " + new Object() {
-        }.getClass().getEnclosingMethod().getName());
-        final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
-
-        //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
-        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
-        runner.assertValid();
-
-        for (int i = 0; i < 100; i++) {
-            long newId = 28039652140L + i;
-            final String newStrId = Long.toString(newId);
-            runner.enqueue(docExample, new HashMap<String, String>() {{
-                put("doc_id", newStrId);
-            }});
-        }
-        runner.run(100);
-        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100);
-    }
-
-    @Test
     @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
     public void testFetchElasticsearchBasicBehindProxy() {
         final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 896b981..a9371d1 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -112,10 +111,9 @@ public class TestPutElasticsearchHttp {
     public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttp.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttp.TYPE);
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
         runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
 
@@ -340,17 +338,16 @@ public class TestPutElasticsearchHttp {
 
         runner.assertNotValid();
         runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(PutElasticsearchHttp.TYPE, "");
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertNotValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, " ");
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertValid();
         runner.removeProperty(PutElasticsearchHttp.TYPE);
-        runner.assertNotValid(); // Not valid because type is required prior to 7.0
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+        runner.setProperty(PutElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
         runner.assertValid();
         runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index a651aef..bea9995 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,7 +28,6 @@ import okio.Buffer;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -197,10 +196,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
         runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
@@ -248,10 +246,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
@@ -291,10 +288,9 @@ public class TestPutElasticsearchHttpRecord {
         runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
 
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
-        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+        runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
         runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index 0f6b6e4..4cdb035 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,7 +32,6 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -65,7 +64,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -85,7 +83,6 @@ public class TestQueryElasticsearchHttp {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setValidateExpressionUsage(true);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -109,20 +106,15 @@ public class TestQueryElasticsearchHttp {
         runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.QUERY,
                 "source:Twitter AND identifier:\"${identifier}\"");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
         runner.assertValid();
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
-        runner.assertValid();
-        runner.removeProperty(QueryElasticsearchHttp.TYPE);
-        runner.assertNotValid();
         runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "${type}");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
         runner.assertValid();
         runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
         runner.assertValid();
@@ -138,7 +130,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -165,7 +156,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -211,7 +201,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -231,7 +220,6 @@ public class TestQueryElasticsearchHttp {
     public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
@@ -254,7 +242,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(500, "Server error");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -280,7 +267,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail");
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -306,7 +292,6 @@ public class TestQueryElasticsearchHttp {
         processor.setExceptionToThrow(new IOException("Error reading from disk"));
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -332,7 +317,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 2);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -359,7 +343,6 @@ public class TestQueryElasticsearchHttp {
         processor.setStatus(100, "Should fail", 1);
         runner = TestRunners.newTestRunner(processor); // simulate doc not found
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -382,9 +365,8 @@ public class TestQueryElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+        runner.removeProperty(QueryElasticsearchHttp.TYPE);
         runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
 
         // Allow time for the controller service to fully initialize
@@ -421,7 +403,6 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -449,7 +430,6 @@ public class TestQueryElasticsearchHttp {
         runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
         runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
         runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.enqueue("".getBytes(), new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -465,7 +445,6 @@ public class TestQueryElasticsearchHttp {
         p.setExpectedParam("myparam=myvalue");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -478,30 +457,15 @@ public class TestQueryElasticsearchHttp {
     @Test
     public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
         QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        p.setExpectedParam("_source=test");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
         runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
         runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
         runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
         runAndVerifySuccess(true);
-
-        // Now test with ES 7.x
-
-        p = new QueryElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
-        runner = TestRunners.newTestRunner(p);
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
-        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(QueryElasticsearchHttp.TYPE, "");
-        runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
-        runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
-        runAndVerifySuccess(true);
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index 74ac031..f790256 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -83,30 +82,15 @@ public class TestScrollElasticsearchHttp {
     @Test
     public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
         ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+        p.setExpectedParam("_source=test");
         runner = TestRunners.newTestRunner(p);
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
         runAndVerifySuccess();
-
-        // Now test with ES 7.x
-
-        p = new ScrollElasticsearchHttpTestProcessor();
-        p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
-        runner = TestRunners.newTestRunner(p);
-        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
-        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
-        runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
-        runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
-        runAndVerifySuccess();
     }
 
     @Test
@@ -163,20 +147,18 @@ public class TestScrollElasticsearchHttp {
 
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
         runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
-        runner.assertNotValid();
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
-        runner.assertValid(); // Valid because type is not required prior to 7.0
+        runner.assertValid();
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
+        runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
         runner.assertValid();
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-        runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "${type}");
         runner.assertValid();
-        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
-        runner.assertNotValid();
         runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+        runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
         runner.assertValid();
         runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -286,9 +268,8 @@ public class TestScrollElasticsearchHttp {
         runner.enableControllerService(sslService);
         runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
-        runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
         runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
-        runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+        runner.removeProperty(ScrollElasticsearchHttp.TYPE);
         runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
         runner.setIncomingConnection(false);
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 214f15e..1eb2893 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
 import org.apache.nifi.serialization.RecordReaderFactory
 import org.apache.nifi.serialization.record.MockRecordParser
 import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.StringUtils
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
 import org.junit.Assert
@@ -206,6 +207,42 @@ class PutElasticsearchRecordTest {
             "schema.name": "recordPathTest"
         ])
         runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        flowFileContents = prettyPrint(toJson([
+                [ msg: "Hello" ],
+                [ id: null, type: null, msg: "Hello" ],
+                [ id: "rec-3", msg: "Hello" ],
+                [ id: "rec-4", msg: "Hello" ],
+                [ id: "rec-5", msg: "Hello" ],
+                [ id: "rec-6", type: "message", msg: "Hello" ]
+        ]))
+
+        evalClosure = { List<IndexOperationRequest> items ->
+            def nullTypeCount = items.findAll { it.type == null }.size()
+            def messageTypeCount = items.findAll { it.type == "message" }.size()
+            def nullIdCount = items.findAll { it.id == null }.size()
+            def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
+            Assert.assertEquals("null type", 5, nullTypeCount)
+            Assert.assertEquals("message type", 1, messageTypeCount)
+            Assert.assertEquals("null id", 2, nullIdCount)
+            Assert.assertEquals("rec- id", 4, recIdCount)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        runner.removeProperty(PutElasticsearchRecord.TYPE)
+        runner.enqueue(flowFileContents, [
+                "schema.name": "recordPathTest"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
 
         runner.clearTransferState()