You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/19 18:54:58 UTC

[nifi] branch master updated: NIFI-5248 Added new Elasticsearch processors. NIFI-5248 Fixed a few stray 1.7.0 references. NIFI-5248 Removed build helper plugin. NIFI-5248 Made changes requested in a review. NIFI-5248 Updated dummy service. NIFI-5248 Made a few changes from a code review. NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed. NIFI-5248 Switched from variable registry to flowfile level EL. NIFI-5248 Added JsonPath code to remove index, id and type path statements. [...]

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c79ff0  NIFI-5248 Added new Elasticsearch processors. NIFI-5248 Fixed a few stray 1.7.0 references. NIFI-5248 Removed build helper plugin. NIFI-5248 Made changes requested in a review. NIFI-5248 Updated dummy service. NIFI-5248 Made a few changes from a code review. NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed. NIFI-5248 Switched from variable registry to flowfile level EL. NIFI-5248 Added JsonPath code to remove index, id and typ [...]
4c79ff0 is described below

commit 4c79ff057638d703cedad1f33855bc066c71f357
Author: Mike Thomsen <mi...@gmail.com>
AuthorDate: Sun Jul 8 14:51:12 2018 -0400

    NIFI-5248 Added new Elasticsearch processors.
    NIFI-5248 Fixed a few stray 1.7.0 references.
    NIFI-5248 Removed build helper plugin.
    NIFI-5248 Made changes requested in a review.
    NIFI-5248 Updated dummy service.
    NIFI-5248 Made a few changes from a code review.
    NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
    NIFI-5248 Switched from variable registry to flowfile level EL.
    NIFI-5248 Added JsonPath code to remove index, id and type path statements.
    NIFI-5248 Updated validation.
    NIFI-5248 Set the field to null instead of empty string when nulling records.
    NIFI-5248 Fixed TestElasticSearchClientService.
    NIFI-5248 Removed high level client and switched over to low level client for everything.
    NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
    NIFI-5248 Updated integration tests to support 5 and 6.
    NIFI-5248 Fixed some style check breaks.
    NIFI-5248 Added create operation type.
    NIFI-5248 Updated documentation.
    NIFI-5248 Added error handling to PutElasticsearchRecord.
    NIFI-5248 Added error logging to PutElasticsearchJson.
    NIFI-5248 Added split failed records option to PutElasticsearchJson.
    NIFI-5248 Added documentation for PutElasticsearchRecord.
    NIFI-5248 Updated import to not use * import.
    NIFI-5248 Removed processor that is no longer relevant due to schema inference.
    NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.
    
    NIFI-5248 Added groovy-json test dependency.
    
    NIFI-5248 Updated PutElasticsearchRecord to only do index operations.
    
    NIFI-5248 Added batch size property and refactored the way relationships and properties are added.
    
    NIFI-5248 Added batch processing support.
    
    NIFI-5248 Updated error handling.
    
    NIFI-5248 Updated to 1.11.0-SNAPSHOT.
    
    NIFI-5248 Made changes requested in a code review.
    
    NIFI-5248 Made a few more changes from a code review.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #2861
---
 .../nifi-elasticsearch-client-service-api/pom.xml  |   6 +
 .../elasticsearch/ElasticSearchClientService.java  |  38 ++-
 ...erationRequest.java => ElasticsearchError.java} |  49 ++-
 .../nifi/elasticsearch/IndexOperationRequest.java  |  31 +-
 .../nifi/elasticsearch/IndexOperationResponse.java |  34 +-
 .../nifi-elasticsearch-client-service/pom.xml      |  43 ++-
 .../ElasticSearchClientServiceImpl.java            | 216 +++++++++----
 .../nifi/elasticsearch/SearchResponseTest.groovy   |  42 +++
 .../ElasticSearch5ClientService_IT.groovy          | 104 +++++-
 .../ElasticSearchLookupService_IT.groovy           |  10 +-
 .../TestElasticSearchClientService.groovy          |  19 +-
 .../src/test/resources/setup-5.script              |  45 +++
 .../src/test/resources/setup-6.script              |  46 +++
 .../src/test/resources/setup-7.script              |  45 +++
 .../src/test/resources/setup.script                |  41 ---
 .../nifi-elasticsearch-restapi-processors/pom.xml  |  45 +++
 .../elasticsearch/DeleteByQueryElasticsearch.java  |   6 +-
 ...cessor.java => ElasticsearchRestProcessor.java} |  37 ++-
 .../elasticsearch/JsonQueryElasticsearch.java      |   9 +-
 .../elasticsearch/PutElasticsearchRecord.java      | 355 +++++++++++++++++++++
 .../elasticsearch/api/BulkOperation.java           |  48 +++
 .../put/FlowFileJsonDescription.java}              |  35 +-
 .../elasticsearch/put/JsonProcessingError.java}    |  20 +-
 .../additionalDetails.html                         |  48 +++
 .../additionalDetails.html                         |  64 ++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../additionalDetails.html                         |   2 +-
 .../additionalDetails.html                         |   4 +-
 .../additionalDetails.html                         |  41 +++
 .../DeleteByQueryElasticsearchTest.groovy          | 143 +++++++++
 .../JsonQueryElasticsearchTest.groovy}             | 266 ++++++++-------
 .../PutElasticsearchRecordTest.groovy              | 261 +++++++++++++++
 .../TestElasticsearchClientService.groovy}         | 103 +++---
 .../mock/AbstractMockElasticsearchClient.groovy}   |  46 ++-
 .../mock/MockBulkLoadClientService.groovy          | 118 +++++++
 .../mock/MockElasticsearchError.groovy}            |  23 +-
 .../src/test/java/.gitignore                       |   1 +
 .../DeleteByQueryElasticsearchTest.java            | 134 --------
 38 files changed, 2029 insertions(+), 550 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index a39c6cd..29b1590 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -43,5 +43,11 @@
             <version>1.11.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.9.8</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 57f359d..d35c784 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -30,12 +30,12 @@ import java.util.List;
 import java.util.Map;
 
 @Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an ElasticSearch client.")
+@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
 public interface ElasticSearchClientService extends ControllerService {
     PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
             .name("el-cs-http-hosts")
             .displayName("HTTP Hosts")
-            .description("A comma-separated list of HTTP hosts that host ElasticSearch query nodes.")
+            .description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -93,7 +93,7 @@ public interface ElasticSearchClientService extends ControllerService {
     PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
             .name("el-cs-charset")
             .displayName("Charset")
-            .description("The charset to use for interpreting the response from ElasticSearch.")
+            .description("The charset to use for interpreting the response from Elasticsearch.")
             .required(true)
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -106,16 +106,26 @@ public interface ElasticSearchClientService extends ControllerService {
      * @return IndexOperationResponse if successful
      * @throws IOException thrown when there is an error.
      */
-    IndexOperationResponse add(IndexOperationRequest operation) throws IOException;
+    IndexOperationResponse add(IndexOperationRequest operation);
 
     /**
-     * Index multiple documents.
+     * Bulk process multiple documents.
      *
-     * @param operations A list of documents to index.
+     * @param operations A list of index operations.
      * @return IndexOperationResponse if successful.
      * @throws IOException thrown when there is an error.
      */
-    IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException;
+    IndexOperationResponse bulk(List<IndexOperationRequest> operations);
+
+    /**
+     * Count the documents that match the criteria.
+     *
+     * @param query A query in the JSON DSL syntax
+     * @param index The index to target.
+     * @param type The type to target.
+     * @return
+     */
+    Long count(String query, String index, String type);
 
     /**
      * Delete a document by its ID from an index.
@@ -125,7 +135,7 @@ public interface ElasticSearchClientService extends ControllerService {
      * @param id The document ID to remove from the selected index.
      * @return A DeleteOperationResponse object if successful.
      */
-    DeleteOperationResponse deleteById(String index, String type, String id) throws IOException;
+    DeleteOperationResponse deleteById(String index, String type, String id);
 
 
     /**
@@ -136,7 +146,7 @@ public interface ElasticSearchClientService extends ControllerService {
      * @return A DeleteOperationResponse object if successful.
      * @throws IOException thrown when there is an error.
      */
-    DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException;
+    DeleteOperationResponse deleteById(String index, String type, List<String> ids);
 
     /**
      * Delete documents by query.
@@ -146,7 +156,7 @@ public interface ElasticSearchClientService extends ControllerService {
      * @param type The type to target within the index. Optional.
      * @return A DeleteOperationResponse object if successful.
      */
-    DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException;
+    DeleteOperationResponse deleteByQuery(String query, String index, String type);
 
     /**
      * Get a document by ID.
@@ -157,23 +167,23 @@ public interface ElasticSearchClientService extends ControllerService {
      * @return Map if successful, null if not found.
      * @throws IOException thrown when there is an error.
      */
-    Map<String, Object> get(String index, String type, String id) throws IOException;
+    Map<String, Object> get(String index, String type, String id);
 
     /**
      * Perform a search using the JSON DSL.
      *
      * @param query A JSON string reprensenting the query.
      * @param index The index to target. Optional.
-     * @param type The type to target. Optional. Will not be used in future versions of ElasticSearch.
+     * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
      * @return A SearchResponse object if successful.
      */
-    SearchResponse search(String query, String index, String type) throws IOException;
+    SearchResponse search(String query, String index, String type);
 
     /**
      * Build a transit URL to use with the provenance reporter.
      * @param index Index targeted. Optional.
      * @param type Type targeted. Optional
-     * @return a URL describing the ElasticSearch cluster.
+     * @return a URL describing the Elasticsearch cluster.
      */
     String getTransitUrl(String index, String type);
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
similarity index 51%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
index 9281adb..c383c8d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
@@ -17,34 +17,29 @@
 
 package org.apache.nifi.elasticsearch;
 
-import java.util.Map;
-
-public class IndexOperationRequest {
-    private String index;
-    private String type;
-    private String id;
-    private Map<String, Object> fields;
-
-    public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
-        this.index = index;
-        this.type = type;
-        this.id = id;
-        this.fields = fields;
-    }
-
-    public String getIndex() {
-        return index;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public String getId() {
-        return id;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ElasticsearchError extends RuntimeException {
+    /**
+     * These are names of common Elasticsearch exceptions where it is safe to assume
+     * that it's OK to retry the operation instead of just sending it to an error relationship.
+     */
+    public static final Set<String> ELASTIC_ERROR_NAMES = new HashSet<String>(){{
+        add("NoNodeAvailableException");
+        add("ElasticsearchTimeoutException");
+        add("ReceiveTimeoutTransportException");
+        add("NodeClosedException");
+    }};
+
+    protected boolean isElastic;
+
+    public ElasticsearchError(Exception ex) {
+        super(ex);
+        isElastic = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
     }
 
-    public Map<String, Object> getFields() {
-        return fields;
+    public boolean isElastic() {
+        return isElastic;
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 9281adb..df0679a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -19,17 +19,23 @@ package org.apache.nifi.elasticsearch;
 
 import java.util.Map;
 
+/**
+ * A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it
+ * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
+ */
 public class IndexOperationRequest {
     private String index;
     private String type;
     private String id;
     private Map<String, Object> fields;
+    private Operation operation;
 
-    public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
+    public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {
         this.index = index;
         this.type = type;
         this.id = id;
         this.fields = fields;
+        this.operation = operation;
     }
 
     public String getIndex() {
@@ -47,4 +53,25 @@ public class IndexOperationRequest {
     public Map<String, Object> getFields() {
         return fields;
     }
-}
+
+    public Operation getOperation() {
+        return operation;
+    }
+
+    public enum Operation {
+        Create("create"),
+        Delete("delete"),
+        Index("index"),
+        Update("update"),
+        Upsert("upsert");
+        String value;
+
+        Operation(String value) {
+            this.value = value;
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
index a22b7aa..c001f69 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
@@ -17,20 +17,44 @@
 
 package org.apache.nifi.elasticsearch;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 public class IndexOperationResponse {
     private long took;
-    private long ingestTook;
+    private boolean hasErrors;
+    private List<Map<String, Object>> items;
 
-    public IndexOperationResponse(long took, long ingestTook) {
+    public IndexOperationResponse(long took) {
         this.took = took;
-        this.ingestTook = ingestTook;
     }
 
     public long getTook() {
         return took;
     }
 
-    public long getIngestTook() {
-        return ingestTook;
+    public boolean hasErrors() {
+        return hasErrors;
+    }
+
+    public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, Object> parsedResponse = mapper.readValue(response, Map.class);
+        int took = (int) parsedResponse.get("took");
+        boolean hasErrors = (boolean) parsedResponse.get("errors");
+        List<Map<String, Object>> items = (List<Map<String, Object>>)parsedResponse.get("items");
+
+        IndexOperationResponse retVal = new IndexOperationResponse(took);
+        retVal.hasErrors = hasErrors;
+        retVal.items = items;
+
+        return retVal;
+    }
+
+    public List<Map<String, Object>> getItems() {
+        return items;
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index e63333e..2873972 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -25,6 +25,12 @@
     <artifactId>nifi-elasticsearch-client-service</artifactId>
     <packaging>jar</packaging>
 
+    <properties>
+        <es.int.version>5.6.15</es.int.version>
+        <script.name>setup-5.script</script.name>
+        <type.name>faketype</type.name>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -159,26 +165,57 @@
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
+	</dependency>
+	<dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>5.6.16</version>
         </dependency>
     </dependencies>
 
     <profiles>
         <profile>
+            <id>integration-6</id>
+            <properties>
+                <es.int.version>6.7.1</es.int.version>
+                <type.name>_doc</type.name>
+                <script.name>setup-6.script</script.name>
+            </properties>
+        </profile>
+        <profile>
+            <id>integration-7</id>
+            <properties>
+                <es.int.version>7.0.0</es.int.version>
+                <script.name>setup-7.script</script.name>
+                <type.name>_doc</type.name>
+            </properties>
+        </profile>
+        <profile>
             <id>integration-tests</id>
             <build>
                 <plugins>
                     <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <version>3.0.0-M3</version>
+                        <configuration>
+                            <systemPropertyVariables>
+                                <type_name>${type.name}</type_name>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </plugin>
+                    <plugin>
                         <groupId>com.github.alexcojocaru</groupId>
                         <artifactId>elasticsearch-maven-plugin</artifactId>
-                        <version>6.0</version>
+                        <version>6.13</version>
                         <configuration>
                             <clusterName>testCluster</clusterName>
                             <transportPort>9500</transportPort>
                             <httpPort>9400</httpPort>
-                            <version>5.6.2</version>
+                            <version>${es.int.version}</version>
                             <timeout>90</timeout>
                             <logLevel>ERROR</logLevel>
-                            <pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript>
+                            <pathInitScript>${project.basedir}/src/test/resources/${script.name}</pathInitScript>
                         </configuration>
                         <executions>
                             <execution>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index b37ba0c..f856c76 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -17,9 +17,9 @@
 
 package org.apache.nifi.elasticsearch;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
-import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -27,25 +27,21 @@ import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.WriteRequest;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
@@ -59,6 +55,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
     private ObjectMapper mapper = new ObjectMapper();
@@ -66,7 +63,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     static final private List<PropertyDescriptor> properties;
 
     private RestClient client;
-    private RestHighLevelClient highLevelClient;
 
     private String url;
     private Charset charset;
@@ -158,10 +154,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
             .setMaxRetryTimeoutMillis(retryTimeout);
 
         this.client = builder.build();
-        this.highLevelClient = new RestHighLevelClient(client);
     }
 
-    private Response runQuery(String endpoint, String query, String index, String type) throws IOException {
+    private Response runQuery(String endpoint, String query, String index, String type) {
         StringBuilder sb = new StringBuilder()
             .append("/")
             .append(index);
@@ -174,68 +169,153 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
         HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
 
-        return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
+        try {
+            return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
+        } catch (Exception e) {
+            throw new ElasticsearchError(e);
+        }
     }
 
-    private Map<String, Object> parseResponse(Response response) throws IOException {
+    private Map<String, Object> parseResponse(Response response) {
         final int code = response.getStatusLine().getStatusCode();
 
-        if (code >= 200 & code < 300) {
-            InputStream inputStream = response.getEntity().getContent();
-            byte[] result = IOUtils.toByteArray(inputStream);
-            inputStream.close();
-            return mapper.readValue(new String(result, charset), Map.class);
-        } else {
-            String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
-                response.getStatusLine().getReasonPhrase());
-            throw new IOException(errorMessage);
+        try {
+            if (code >= 200 & code < 300) {
+                InputStream inputStream = response.getEntity().getContent();
+                byte[] result = IOUtils.toByteArray(inputStream);
+                inputStream.close();
+                return mapper.readValue(new String(result, charset), Map.class);
+            } else {
+                String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
+                        response.getStatusLine().getReasonPhrase());
+                throw new IOException(errorMessage);
+            }
+        } catch (Exception ex) {
+            throw new ElasticsearchError(ex);
         }
     }
 
     @Override
-    public IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
-        return add(Arrays.asList(operation));
+    public IndexOperationResponse add(IndexOperationRequest operation) {
+        return bulk(Arrays.asList(operation));
     }
 
-    @Override
-    public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
-        BulkRequest bulkRequest = new BulkRequest();
-        for (int index = 0; index < operations.size(); index++) {
-            IndexOperationRequest or = operations.get(index);
-            IndexRequest indexRequest = new IndexRequest(or.getIndex(), or.getType(), or.getId())
-                .source(or.getFields());
-            bulkRequest.add(indexRequest);
+    private String flatten(String str) {
+        return str.replaceAll("[\\n\\r]", "\\\\n");
+    }
+
+    private String buildBulkHeader(IndexOperationRequest request) throws JsonProcessingException {
+        String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
+                ? "update"
+                : request.getOperation().getValue();
+        return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
+    }
+
+    private String buildBulkHeader(String operation, String index, String type, String id) throws JsonProcessingException {
+        Map<String, Object> header = new HashMap<String, Object>() {{
+            put(operation, new HashMap<String, Object>() {{
+                put("_index", index);
+                put("_id", id);
+                put("_type", type);
+            }});
+        }};
+
+        return flatten(mapper.writeValueAsString(header));
+    }
+
+    protected void buildRequest(IndexOperationRequest request, StringBuilder builder) throws JsonProcessingException {
+        String header = buildBulkHeader(request);
+        builder.append(header).append("\n");
+        if (request.getOperation().equals(IndexOperationRequest.Operation.Index)) {
+            String indexDocument = mapper.writeValueAsString(request.getFields());
+            builder.append(indexDocument).append("\n");
+        } else if (request.getOperation().equals(IndexOperationRequest.Operation.Update)
+            || request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+            Map<String, Object> doc = new HashMap<String, Object>() {{
+                put("doc", request.getFields());
+                if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+                    put("doc_as_upsert", true);
+                }
+            }};
+            String update = flatten(mapper.writeValueAsString(doc)).trim();
+            builder.append(String.format("%s\n", update));
         }
+    }
 
-        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+    @Override
+    public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
+        try {
+            StringBuilder payload = new StringBuilder();
+            for (int index = 0; index < operations.size(); index++) {
+                IndexOperationRequest or = operations.get(index);
+                buildRequest(or, payload);
+            }
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(payload.toString());
+            }
+            HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
+            StopWatch watch = new StopWatch();
+            watch.start();
+            Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+            watch.stop();
+
+            String rawResponse = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Response was: %s", rawResponse));
+            }
+
+            IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
+
+            return retVal;
+        } catch (Exception ex) {
+            throw new ElasticsearchError(ex);
+        }
+    }
 
-        BulkResponse response = highLevelClient.bulk(bulkRequest);
-        IndexOperationResponse retVal = new IndexOperationResponse(response.getTookInMillis(), response.getIngestTookInMillis());
+    @Override
+    public Long count(String query, String index, String type) {
+        Response response = runQuery("_count", query, index, type);
+        Map<String, Object> parsed = parseResponse(response);
 
-        return retVal;
+        return ((Integer)parsed.get("count")).longValue();
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+    public DeleteOperationResponse deleteById(String index, String type, String id) {
         return deleteById(index, type, Arrays.asList(id));
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
-        BulkRequest bulk = new BulkRequest();
-        for (int idx = 0; idx < ids.size(); idx++) {
-            DeleteRequest request = new DeleteRequest(index, type, ids.get(idx));
-            bulk.add(request);
+    public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            for (int idx = 0; idx < ids.size(); idx++) {
+                String header = buildBulkHeader("delete", index, type, ids.get(idx));
+                sb.append(header).append("\n");
+            }
+            HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
+            StopWatch watch = new StopWatch();
+            watch.start();
+            Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+            watch.stop();
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Response for bulk delete: %s",
+                        IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
+            }
+
+            DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
+
+            return dor;
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
         }
-        BulkResponse response = highLevelClient.bulk(bulk);
-
-        DeleteOperationResponse dor = new DeleteOperationResponse(response.getTookInMillis());
-
-        return dor;
     }
 
     @Override
-    public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+    public DeleteOperationResponse deleteByQuery(String query, String index, String type) {
         long start = System.currentTimeMillis();
         Response response = runQuery("_delete_by_query", query, index, type);
         long end   = System.currentTimeMillis();
@@ -245,14 +325,40 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     }
 
     @Override
-    public Map<String, Object> get(String index, String type, String id) throws IOException {
-        GetRequest get = new GetRequest(index, type, id);
-        GetResponse resp = highLevelClient.get(get, new Header[]{});
-        return resp.getSource();
+    public Map<String, Object> get(String index, String type, String id) {
+        try {
+            StringBuilder endpoint = new StringBuilder();
+            endpoint.append(index);
+            if (!StringUtils.isEmpty(type)) {
+                endpoint.append("/").append(type);
+            }
+            endpoint.append("/").append(id);
+            Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
+
+            String body = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
+
+            return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
+        } catch (Exception ex) {
+            getLogger().error("", ex);
+            return null;
+        }
+    }
+
+    /*
+     * In pre-7.X ElasticSearch, it should return just a number. 7.X and after they are returning a map.
+     */
+    private int handleSearchCount(Object raw) {
+        if (raw instanceof Number) {
+            return Integer.valueOf(raw.toString());
+        } else if (raw instanceof Map) {
+            return (Integer)((Map)raw).get("value");
+        } else {
+            throw new ProcessException("Unknown type for hit count.");
+        }
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) throws IOException {
+    public SearchResponse search(String query, String index, String type) {
         Response response = runQuery("_search", query, index, type);
         Map<String, Object> parsed = parseResponse(response);
 
@@ -261,7 +367,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         Map<String, Object> aggregations = parsed.get("aggregations") != null
                 ? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>();
         Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits");
-        int count = (Integer)hitsParent.get("total");
+        int count = handleSearchCount(hitsParent.get("total"));
         List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits");
 
         SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
new file mode 100644
index 0000000..78e641d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch
+
+import org.junit.Assert
+import org.junit.Test
+
+class SearchResponseTest {
+    @Test
+    void test() {
+        def results = []
+        def aggs    = [:]
+        def num     = 10
+        def took    = 100
+        def timeout = false
+        def response = new SearchResponse(results, aggs, num, took, timeout)
+        def str = response.toString()
+        Assert.assertEquals(results, response.hits)
+        Assert.assertEquals(aggs, response.aggregations)
+        Assert.assertEquals(num, response.numberOfHits)
+        Assert.assertEquals(took, response.took)
+        Assert.assertEquals(timeout, response.timedOut)
+        Assert.assertTrue(str.contains("aggregations"))
+        Assert.assertTrue(str.contains("hits"))
+        Assert.assertTrue(str.contains("numberOfHits"))
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
index fd71d61..d2104aa 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
@@ -20,6 +20,8 @@ package org.apache.nifi.elasticsearch.integration
 import org.apache.nifi.elasticsearch.DeleteOperationResponse
 import org.apache.nifi.elasticsearch.ElasticSearchClientService
 import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
 import org.apache.nifi.elasticsearch.SearchResponse
 import org.apache.nifi.ssl.StandardSSLContextService
 import org.apache.nifi.util.TestRunner
@@ -37,8 +39,8 @@ class ElasticSearch5ClientService_IT {
     private TestRunner runner
     private ElasticSearchClientServiceImpl service
 
-    static final String INDEX = "messages"
-    static final String TYPE  = "message"
+    static String INDEX = "messages"
+    static String TYPE  = System.getProperty("type_name")
 
     @Before
     void before() throws Exception {
@@ -80,7 +82,7 @@ class ElasticSearch5ClientService_IT {
         ]))
         
         
-        SearchResponse response = service.search(query, "messages", "message")
+        SearchResponse response = service.search(query, "messages", TYPE)
         Assert.assertNotNull("Response was null", response)
 
         Assert.assertEquals("Wrong count", 15, response.numberOfHits)
@@ -138,6 +140,7 @@ class ElasticSearch5ClientService_IT {
     @Test
     void testGet() throws IOException {
         Map old
+        System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + "TYPE: " + TYPE + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
         1.upto(15) { index ->
             String id = String.valueOf(index)
             def doc = service.get(INDEX, TYPE, id)
@@ -171,4 +174,99 @@ class ElasticSearch5ClientService_IT {
 
         runner.assertValid()
     }
+
+    @Test
+    void testBulkAddTwoIndexes() throws Exception {
+        List<IndexOperationRequest> payload = new ArrayList<>()
+        for (int x = 0; x < 20; x++) {
+            String index = x % 2 == 0 ? "bulk_a": "bulk_b"
+            payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+                put("msg", "test")
+            }}, IndexOperationRequest.Operation.Index))
+        }
+        for (int x = 0; x < 5; x++) {
+            payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+                put("msg", "test")
+            }}, IndexOperationRequest.Operation.Index))
+        }
+        IndexOperationResponse response = service.bulk(payload)
+        Assert.assertNotNull(response)
+        Assert.assertTrue(response.getTook() > 0)
+        Thread.sleep(2000)
+
+        /*
+         * Now, check to ensure that both indexes got populated appropriately.
+         */
+        String query = "{ \"query\": { \"match_all\": {}}}"
+        Long indexA = service.count(query, "bulk_a", TYPE)
+        Long indexB = service.count(query, "bulk_b", TYPE)
+        Long indexC = service.count(query, "bulk_c", TYPE)
+
+        Assert.assertNotNull(indexA)
+        Assert.assertNotNull(indexB)
+        Assert.assertNotNull(indexC)
+        Assert.assertEquals(indexA, indexB)
+        Assert.assertEquals(10, indexA.intValue())
+        Assert.assertEquals(10, indexB.intValue())
+        Assert.assertEquals(5, indexC.intValue())
+
+        Long total = service.count(query, "bulk_*", TYPE)
+        Assert.assertNotNull(total)
+        Assert.assertEquals(25, total.intValue())
+    }
+
+    @Test
+    void testUpdateAndUpsert() {
+        final String TEST_ID = "update-test"
+        Map<String, Object> doc = new HashMap<>()
+        doc.put("msg", "Buongiorno, mondo")
+        service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index))
+        Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID)
+        Assert.assertEquals("Not the same", doc, result)
+
+        Map<String, Object> updates = new HashMap<>()
+        updates.put("from", "john.smith")
+        Map<String, Object> merged = new HashMap<>()
+        merged.putAll(updates)
+        merged.putAll(doc)
+        IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update)
+        service.add(request)
+        result = service.get(INDEX, TYPE, TEST_ID)
+        Assert.assertTrue(result.containsKey("from"))
+        Assert.assertTrue(result.containsKey("msg"))
+        Assert.assertEquals("Not the same after update.", merged, result)
+
+        final String UPSERTED_ID = "upsert-ftw"
+        Map<String, Object> upsertItems = new HashMap<>()
+        upsertItems.put("upsert_1", "hello")
+        upsertItems.put("upsert_2", 1)
+        upsertItems.put("upsert_3", true)
+        request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert)
+        service.add(request)
+        result = service.get(INDEX, TYPE, UPSERTED_ID)
+        Assert.assertEquals(upsertItems, result)
+
+        List<IndexOperationRequest> deletes = new ArrayList<>()
+        deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete))
+        deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
+        service.bulk(deletes)
+        Assert.assertNull(service.get(INDEX, TYPE, TEST_ID))
+        Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID))
+    }
+
+    @Test
+    void testGetBulkResponsesWithErrors() {
+        def ops = [
+            new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Index),
+            new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
+            new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
+            new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: "notaninteger"], IndexOperationRequest.Operation.Index)
+        ]
+        def response = service.bulk(ops)
+        assert response.hasErrors()
+        assert response.items.findAll {
+            def key = it.keySet().stream().findFirst().get()
+            it[key].containsKey("error")
+        }.size() == 2
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
index 2ce9573..f8d4ef5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
@@ -39,6 +39,8 @@ class ElasticSearchLookupService_IT {
     private ElasticSearchClientService service
     private ElasticSearchLookupService lookupService
 
+    static String TYPE  = System.getProperty("type_name")
+
     @Before
     void before() throws Exception {
         runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
@@ -54,7 +56,7 @@ class ElasticSearchLookupService_IT {
         runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
         runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "details")
+        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
 
         try {
             runner.enableControllerService(service)
@@ -141,7 +143,7 @@ class ElasticSearchLookupService_IT {
 
         runner.disableControllerService(lookupService)
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
         runner.enableControllerService(lookupService)
 
         Optional<Record> response = lookupService.lookup(coordinates)
@@ -162,7 +164,7 @@ class ElasticSearchLookupService_IT {
     void testDetectedSchema() throws LookupFailureException {
         runner.disableControllerService(lookupService)
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "complex")
+        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
         runner.enableControllerService(lookupService)
         def coordinates = ["_id": "1" ]
 
@@ -196,7 +198,7 @@ class ElasticSearchLookupService_IT {
         runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
         runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
         runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
-        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+        runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
         runner.enableControllerService(lookupService)
 
         def coordinates = ["msg": "Hello, world"]
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
index 899cedb..6dfc5e4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
@@ -33,37 +33,42 @@ class TestElasticSearchClientService extends AbstractControllerService implement
     ]
 
     @Override
-    IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
+    IndexOperationResponse add(IndexOperationRequest operation) {
         return null
     }
 
     @Override
-    IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
+    IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+    Long count(String query, String index, String type) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+    DeleteOperationResponse deleteById(String index, String type, String id) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+    DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
         return null
     }
 
     @Override
-    Map<String, Object> get(String index, String type, String id) throws IOException {
+    DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+        return null
+    }
+
+    @Override
+    Map<String, Object> get(String index, String type, String id) {
         return data
     }
 
     @Override
-    SearchResponse search(String query, String index, String type) throws IOException {
+    SearchResponse search(String query, String index, String type) {
         List hits = [[
             "_source": data
         ]]
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script
new file mode 100644
index 0000000..bac18d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{"faketype":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
+PUT:messages/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:complex/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
+PUT:nested/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_b/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_c/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:error_handler:{ "mappings": { "faketype": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+#add document
+PUT:messages/faketype/1:{ "msg":"one" }
+PUT:messages/faketype/2:{ "msg":"two" }
+PUT:messages/faketype/3:{ "msg":"two" }
+PUT:messages/faketype/4:{ "msg":"three" }
+PUT:messages/faketype/5:{ "msg":"three" }
+PUT:messages/faketype/6:{ "msg":"three" }
+PUT:messages/faketype/7:{ "msg":"four" }
+PUT:messages/faketype/8:{ "msg":"four" }
+PUT:messages/faketype/9:{ "msg":"four" }
+PUT:messages/faketype/10:{ "msg":"four" }
+PUT:messages/faketype/11:{ "msg":"five" }
+PUT:messages/faketype/12:{ "msg":"five" }
+PUT:messages/faketype/13:{ "msg":"five" }
+PUT:messages/faketype/14:{ "msg":"five" }
+PUT:messages/faketype/15:{ "msg":"five" }
+PUT:complex/faketype/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+PUT:user_details/faketype/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+PUT:user_details/faketype/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+PUT:nested/faketype/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+PUT:nested/faketype/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
new file mode 100644
index 0000000..703ec18
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{"_doc":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
+PUT:messages/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:complex/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
+PUT:nested/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+
+#add document
+PUT:messages/_doc/1:{ "msg":"one" }
+PUT:messages/_doc/2:{ "msg":"two" }
+PUT:messages/_doc/3:{ "msg":"two" }
+PUT:messages/_doc/4:{ "msg":"three" }
+PUT:messages/_doc/5:{ "msg":"three" }
+PUT:messages/_doc/6:{ "msg":"three" }
+PUT:messages/_doc/7:{ "msg":"four" }
+PUT:messages/_doc/8:{ "msg":"four" }
+PUT:messages/_doc/9:{ "msg":"four" }
+PUT:messages/_doc/10:{ "msg":"four" }
+PUT:messages/_doc/11:{ "msg":"five" }
+PUT:messages/_doc/12:{ "msg":"five" }
+PUT:messages/_doc/13:{ "msg":"five" }
+PUT:messages/_doc/14:{ "msg":"five" }
+PUT:messages/_doc/15:{ "msg":"five" }
+PUT:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+PUT:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
new file mode 100644
index 0000000..0b240eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}
+PUT:messages/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:complex/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}
+PUT:nested/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+#add document
+POST:messages/_doc/1:{ "msg":"one" }
+POST:messages/_doc/2:{ "msg":"two" }
+POST:messages/_doc/3:{ "msg":"two" }
+POST:messages/_doc/4:{ "msg":"three" }
+POST:messages/_doc/5:{ "msg":"three" }
+POST:messages/_doc/6:{ "msg":"three" }
+POST:messages/_doc/7:{ "msg":"four" }
+POST:messages/_doc/8:{ "msg":"four" }
+POST:messages/_doc/9:{ "msg":"four" }
+POST:messages/_doc/10:{ "msg":"four" }
+POST:messages/_doc/11:{ "msg":"five" }
+POST:messages/_doc/12:{ "msg":"five" }
+POST:messages/_doc/13:{ "msg":"five" }
+POST:messages/_doc/14:{ "msg":"five" }
+POST:messages/_doc/15:{ "msg":"five" }
+POST:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+POST:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
deleted file mode 100644
index a17a039..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#create mapping
-PUT:user_details/:{ "mappings":{"details":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
-PUT:messages/:{ "mappings":{"message":{ "properties":{ "msg":{"type":"keyword"}}}}}
-PUT:complex/:{"mappings":{"complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
-PUT:nested/:{"mappings":{"nested_complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
-#add document
-PUT:messages/message/1:{ "msg":"one" }
-PUT:messages/message/2:{ "msg":"two" }
-PUT:messages/message/3:{ "msg":"two" }
-PUT:messages/message/4:{ "msg":"three" }
-PUT:messages/message/5:{ "msg":"three" }
-PUT:messages/message/6:{ "msg":"three" }
-PUT:messages/message/7:{ "msg":"four" }
-PUT:messages/message/8:{ "msg":"four" }
-PUT:messages/message/9:{ "msg":"four" }
-PUT:messages/message/10:{ "msg":"four" }
-PUT:messages/message/11:{ "msg":"five" }
-PUT:messages/message/12:{ "msg":"five" }
-PUT:messages/message/13:{ "msg":"five" }
-PUT:messages/message/14:{ "msg":"five" }
-PUT:messages/message/15:{ "msg":"five" }
-PUT:complex/complex/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
-PUT:user_details/details/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
-PUT:user_details/details/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
-PUT:nested/nested_complex/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
-PUT:nested/nested_complex/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
-PUT:nested/nested_complex/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index 8270d0b..5d7ef89 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -57,6 +57,11 @@ language governing permissions and limitations under the License. -->
             <version>1.11.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
             <version>2.6</version>
@@ -94,6 +99,46 @@ language governing permissions and limitations under the License. -->
             <version>1.11.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>2.4.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-json</artifactId>
+            <version>${nifi.groovy.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
index 609d8ed..f191778 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -43,14 +43,14 @@ import java.util.Map;
 import java.util.Set;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@CapabilityDescription("Delete from an ElasticSearch index using a query. The query can be loaded from a flowfile body " +
+@CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
         "or from the Query parameter.")
 @Tags({ "elastic", "elasticsearch", "delete", "query"})
 @WritesAttributes({
     @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
-    @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by ElasticSearch if there is an error running the delete.")
+    @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
 })
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor {
+public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
         .description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build();
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
similarity index 67%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index ccf04ca..a52d2ca 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -25,12 +25,15 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-public interface ElasticSearchRestProcessor {
+public interface ElasticsearchRestProcessor {
+    String ATTR_RECORD_COUNT = "record.count";
+
     PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
             .name("el-rest-fetch-index")
             .displayName("Index")
@@ -43,7 +46,7 @@ public interface ElasticSearchRestProcessor {
     PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
             .name("el-rest-type")
             .displayName("Type")
-            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .description("The type of this document (used by Elasticsearch for indexing and searching).")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -70,11 +73,39 @@ public interface ElasticSearchRestProcessor {
     PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
             .name("el-rest-client-service")
             .displayName("Client Service")
-            .description("An ElasticSearch client service to use for running queries.")
+            .description("An Elasticsearch client service to use for running queries.")
             .identifiesControllerService(ElasticSearchClientService.class)
             .required(true)
             .build();
 
+    PropertyDescriptor LOG_ERROR_RESPONSES = new PropertyDescriptor.Builder()
+            .name("put-es-record-log-error-responses")
+            .displayName("Log Error Responses")
+            .description("If this is enabled, errors will be logged to the NiFi logs at the error log level. Otherwise, they will " +
+                    "only be logged if debug logging is enabled on NiFi as a whole. The purpose of this option is to give the user " +
+                    "the ability to debug failed operations without having to turn on debug logging.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All flowfiles that fail for reasons unrelated to server availability go to this relationship.")
+            .build();
+    Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("All flowfiles that fail due to server/cluster availability go to this relationship.")
+            .build();
+    Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All flowfiles that succeed in being transferred into Elasticsearch go here.")
+            .build();
+    Relationship REL_FAILED_RECORDS = new Relationship.Builder()
+            .name("errors").description("If an output record write is set, any record that failed to process the way it was " +
+                    "configured will be sent to this relationship as part of a failed record record set.")
+            .autoTerminateDefault(true).build();
+
     default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
         String retVal = null;
         if (context.getProperty(QUERY).isSet()) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index 40d6118..903a124 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -56,18 +56,15 @@ import java.util.Set;
 @EventDriven
 @Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
 @CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " +
-        "ElasticSearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
+        "Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
         "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
-        "from ElasticSearch will be loaded into memory all at once and converted into the resulting flowfiles.")
-public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor {
+        "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
             .description("All original flowfiles that don't cause an error to occur go to this relationship. " +
                     "This applies even if you select the \"split up hits\" option to send individual hits to the " +
                     "\"hits\" relationship.").build();
 
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
-            .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
-
     public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
             .description("Search hits are routed to this relationship.")
             .build();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
new file mode 100644
index 0000000..bbc86e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"})
+@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
+public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("put-es-record-reader")
+        .displayName("Record Reader")
+        .description("The record reader to use for reading incoming records from flowfiles.")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("put-es-record-batch-size")
+        .displayName("Batch Size")
+        .description("The number of records to send over in a single batch.")
+        .defaultValue("100")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("put-es-record-id-path")
+        .displayName("ID Record Path")
+        .description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
+                "the ID will be automatically generated by Elasticsearch.")
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor INDEX_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("put-es-record-index-record-path")
+        .displayName("Index Record Path")
+        .description("A record path expression to retrieve the index field for use with Elasticsearch. If left blank " +
+                "the index will be determined using the main index property.")
+        .addValidator(new RecordPathValidator())
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor TYPE_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("put-es-record-type-record-path")
+        .displayName("Type Record Path")
+        .description("A record path expression to retrieve the type field for use with Elasticsearch. If left blank " +
+                "the type will be determined using the main type property.")
+        .addValidator(new RecordPathValidator())
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("put-es-record-error-writer")
+        .displayName("Error Record Writer")
+        .description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
+                "and the failed records will be written to a record set with this record writer service and sent to the \"errors\" " +
+                "relationship.")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .addValidator(Validator.VALID)
+        .required(false)
+        .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+        INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
+        LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
+    )));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    private RecordReaderFactory readerFactory;
+    private RecordPathCache recordPathCache;
+    private ElasticSearchClientService clientService;
+    private RecordSetWriterFactory writerFactory;
+    private boolean logErrors;
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+        this.recordPathCache = new RecordPathCache(16);
+        this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
+                ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+                : null;
+        final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
+                ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+                : null;
+        final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
+                ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
+        RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
+        RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
+
+        int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
+        List<FlowFile> badRecords = new ArrayList<>();
+
+        try (final InputStream inStream = session.read(input);
+             final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
+            Record record;
+            List<IndexOperationRequest> operationList = new ArrayList<>();
+            List<Record> originals = new ArrayList<>();
+
+            while ((record = reader.nextRecord()) != null) {
+                final String idx = getFromRecordPath(record, iPath, index);
+                final String t   = getFromRecordPath(record, tPath, type);
+                final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
+                final String id  = path != null ? getFromRecordPath(record, path, null) : null;
+
+                Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+
+                removeEmpty(contentMap);
+
+                operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
+                originals.add(record);
+
+                if (operationList.size() == batchSize) {
+                    BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+                    FlowFile bad = indexDocuments(bundle, session, input);
+                    if (bad != null) {
+                        badRecords.add(bad);
+                    }
+
+                    operationList.clear();
+                    originals.clear();
+                }
+            }
+
+            if (operationList.size() > 0) {
+                BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+                FlowFile bad = indexDocuments(bundle, session, input);
+                if (bad != null) {
+                    badRecords.add(bad);
+                }
+            }
+
+            session.transfer(input, REL_SUCCESS);
+        } catch (ElasticsearchError ese) {
+            String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Moving to retry." : "Moving to failure");
+            getLogger().error(msg, ese);
+            Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
+            session.penalize(input);
+            session.transfer(input, rel);
+            removeBadRecordFlowFiles(badRecords, session);
+        } catch (Exception ex) {
+            getLogger().error("Could not index documents.", ex);
+            session.transfer(input, REL_FAILURE);
+            removeBadRecordFlowFiles(badRecords, session);
+        }
+    }
+
+    private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
+        for (FlowFile badFlowFile : bad) {
+            session.remove(badFlowFile);
+        }
+
+        bad.clear();
+    }
+
+    private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
+        IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
+        if (response.hasErrors()) {
+            if(logErrors || getLogger().isDebugEnabled()) {
+                List<Map<String, Object>> errors = response.getItems();
+                ObjectMapper mapper = new ObjectMapper();
+                mapper.enable(SerializationFeature.INDENT_OUTPUT);
+                String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
+
+                if (logErrors) {
+                    getLogger().error(output);
+                } else {
+                    getLogger().debug(output);
+                }
+            }
+
+            if (writerFactory != null) {
+                FlowFile errorFF = session.create(input);
+                try (OutputStream os = session.write(errorFF);
+                     RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
+
+                    int added = 0;
+                    writer.beginRecordSet();
+                    for (int index = 0; index < response.getItems().size(); index++) {
+                        Map<String, Object> current = response.getItems().get(index);
+                        String key = current.keySet().stream().findFirst().get();
+                        Map<String, Object> inner = (Map<String, Object>) current.get(key);
+                        if (inner.containsKey("error")) {
+                            writer.write(bundle.getOriginalRecords().get(index));
+                            added++;
+                        }
+                    }
+                    writer.finishRecordSet();
+                    writer.close();
+                    os.close();
+
+                    errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
+
+                    session.transfer(errorFF, REL_FAILED_RECORDS);
+
+                    return errorFF;
+                } catch (Exception ex) {
+                    getLogger().error("", ex);
+                    session.remove(errorFF);
+                    throw ex;
+                }
+            }
+
+            return null;
+        } else {
+            return null;
+        }
+    }
+
+    private void removeEmpty(Map<String, Object> input) {
+        Map<String, Object> copy = new HashMap<>(input);
+
+        for (Map.Entry<String, Object> entry : input.entrySet()) {
+            if (entry.getValue() == null) {
+               copy.remove(entry.getKey());
+            } else {
+                if (StringUtils.isBlank(entry.getValue().toString())) {
+                    copy.remove(entry.getKey());
+                } else if (entry.getValue() instanceof Map) {
+                    removeEmpty((Map<String, Object>) entry.getValue());
+                } else if (entry.getValue() instanceof List) {
+                    for (Object value : (List)entry.getValue()) {
+                        if (value instanceof Map) {
+                            removeEmpty((Map<String, Object>) value);
+                        }
+                    }
+                }
+            }
+        }
+
+        input.clear();
+        input.putAll(copy);
+    }
+
+    private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
+        if (path == null) {
+            return fallback;
+        }
+
+        RecordPathResult result = path.evaluate(record);
+        Optional<FieldValue> value = result.getSelectedFields().findFirst();
+        if (value.isPresent() && value.get().getValue() != null) {
+            FieldValue fieldValue = value.get();
+            if (!fieldValue.getField().getDataType().getFieldType().equals(RecordFieldType.STRING) ) {
+                throw new ProcessException(
+                    String.format("Field referenced by %s must be a string.", path.getPath())
+                );
+            }
+
+            fieldValue.updateValue(null);
+
+            String retVal = fieldValue.getValue().toString();
+
+            return retVal;
+        } else {
+            return fallback;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
new file mode 100644
index 0000000..f54aa60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.List;
+
+public class BulkOperation {
+    private List<IndexOperationRequest> operationList;
+    private List<Record> originalRecords;
+    private RecordSchema schema;
+
+    public BulkOperation(List<IndexOperationRequest> operationList, List<Record> originalRecords, RecordSchema schema) {
+        this.operationList = operationList;
+        this.originalRecords = originalRecords;
+        this.schema = schema;
+    }
+
+    public List<IndexOperationRequest> getOperationList() {
+        return operationList;
+    }
+
+    public List<Record> getOriginalRecords() {
+        return originalRecords;
+    }
+
+    public RecordSchema getSchema() {
+        return schema;
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
similarity index 57%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
index 9281adb..0312e11 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
@@ -15,21 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.put;
 
-import java.util.Map;
+import com.jayway.jsonpath.JsonPath;
 
-public class IndexOperationRequest {
+public class FlowFileJsonDescription {
     private String index;
     private String type;
     private String id;
-    private Map<String, Object> fields;
+    private String content;
 
-    public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
+    private JsonPath idJsonPath;
+    private JsonPath typeJsonPath;
+    private JsonPath indexJsonPath;
+
+    public FlowFileJsonDescription(String index, String type, String id, String content, JsonPath idJsonPath, JsonPath typeJsonPath, JsonPath indexJsonPath) {
         this.index = index;
         this.type = type;
         this.id = id;
-        this.fields = fields;
+        this.content = content;
+        this.idJsonPath = idJsonPath;
+        this.typeJsonPath = typeJsonPath;
+        this.indexJsonPath = indexJsonPath;
     }
 
     public String getIndex() {
@@ -44,7 +51,19 @@ public class IndexOperationRequest {
         return id;
     }
 
-    public Map<String, Object> getFields() {
-        return fields;
+    public String getContent() {
+        return content;
+    }
+
+    public JsonPath getIdJsonPath() {
+        return idJsonPath;
+    }
+
+    public JsonPath getTypeJsonPath() {
+        return typeJsonPath;
+    }
+
+    public JsonPath getIndexJsonPath() {
+        return indexJsonPath;
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
similarity index 68%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
index a22b7aa..2d7a508 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
@@ -15,22 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.put;
 
-public class IndexOperationResponse {
-    private long took;
-    private long ingestTook;
-
-    public IndexOperationResponse(long took, long ingestTook) {
-        this.took = took;
-        this.ingestTook = ingestTook;
-    }
-
-    public long getTook() {
-        return took;
-    }
-
-    public long getIngestTook() {
-        return ingestTook;
+public class JsonProcessingError extends Exception {
+    public JsonProcessingError(String message) {
+        super(message);
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
new file mode 100644
index 0000000..2034d64
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutElasticsearchJson</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor is designed to take any valid JSON and index it into Elasticsearch 5.x or newer. Unlike the record-aware
+    alternative, it only does index operations, not updates, upserts or deletes.</p>
+    <p>The ID for each document can be set in one of three ways:</p>
+    <ul>
+        <li>FlowFile attribute - only possible if the flowfile contains only a single document. Arrays of documents
+        will cause an error to be raised.</li>
+        <li>JsonPath operation - searches within the document for a particular field.</li>
+        <li>Default - Elasticsearch will generate one. This will be used when neither the attribute nor JsonPath configuration
+        fields are used.</li>
+    </ul>
+    <p>Example document:</p>
+    <pre>
+        {
+            "index": "messages",
+            "type": "message",
+            "message": "Hello, world",
+            "from": "john.smith"
+        }
+    </pre>
+    <p>The following JsonPath operations will extract the index and type:</p>
+    <ul>
+        <li>$.index</li>
+        <li>$.type</li>
+    </ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
new file mode 100644
index 0000000..9206785
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutElasticsearchRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This is a record-aware processor built for Elasticsearch 5 and later. The index and type fields are configured
+    with default values that can be overridden using record path operations that find an index or type value in the
+    record set. The ID and operation type (index, update, upsert or delete) can also be extracted in a similar fashion from
+    the record set. The following is an example of a document exercising all of these features:</p>
+    <pre>
+        {
+            "metadata": {
+                "id": "12345",
+                "index": "test",
+                "type": "message",
+                "operation": "index"
+            },
+            "message": "Hello, world",
+            "from": "john.smith"
+        }
+    </pre>
+    <pre>
+        {
+            "metadata": {
+                "id": "12345",
+                "index": "test",
+                "type": "message",
+                "operation": "delete"
+            }
+        }
+    </pre>
+    <p>The record path operations below would extract the relevant data:</p>
+    <ul>
+        <li>/metadata/id</li>
+        <li>/metadata/index</li>
+        <li>metadata/type</li>
+        <li>metadata/operation</li>
+    </ul>
+    <p>Valid values for "operation" are:</p>
+    <ul>
+        <li>delete</li>
+        <li>index</li>
+        <li>update</li>
+        <li>upsert</li>
+    </ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ac8c915..84e00b4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
 
 org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
index 3a5331f..05c19ad 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
@@ -21,7 +21,7 @@
 </head>
 <body>
     <p>This processor executes a delete operation against one or more indices using the _delete_by_query handler. The
-    query should be a valid ElasticSearch JSON DSL query (Lucene syntax is not supported). An example query:</p>
+    query should be a valid Elasticsearch JSON DSL query (Lucene syntax is not supported). An example query:</p>
     <pre>
         {
             "query": {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
index 3430cde..52f5f3d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
@@ -20,8 +20,8 @@
     <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
 </head>
 <body>
-    <p>This processor is intended for use with the ElasticSearch JSON DSL and ElasticSearch 5.X and newer. It is designed
-    to be able to take a query from Kibana and execute it as-is against an ElasticSearch cluster. Like all processors in the
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
     "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
     <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
     the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
new file mode 100644
index 0000000..0ac19b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -0,0 +1,41 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutElasticsearchRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+<p>
+    This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
+    a per-record basis which is what separates it from PutElasticsearchHttpRecord. For example, it is possible to define
+    multiple commands to index documents, followed by deletes, creates and update operations against the same index or
+    other indices as desired.
+</p>
+<p>
+    As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and
+    that controller service is built on top of the official Elasticsearch client APIs. That provides features such as
+    automatic master detection against the cluster which is missing in the other bundles.
+</p>
+<p>
+    This processor builds one Elasticsearch Bulk API body per record set. Care should be taken to split up record sets
+    into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are
+    not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records
+    that failed to an output record writer so that only failed records can be processed downstream or replayed.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
new file mode 100644
index 0000000..8e913ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+class DeleteByQueryElasticsearchTest {
+    private static final String INDEX = "test_idx"
+    private static final String TYPE  = "test_type"
+    private static final String QUERY_ATTR = "es.delete.query"
+    private static final String CLIENT_NAME = "clientService"
+
+    private TestElasticsearchClientService client
+
+    private void initClient(TestRunner runner) throws Exception {
+        client = new TestElasticsearchClientService(true)
+        runner.addControllerService(CLIENT_NAME, client)
+        runner.enableControllerService(client)
+        runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
+    }
+
+    private void postTest(TestRunner runner, String queryParam) {
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS)
+        String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE)
+        String query = flowFiles.get(0).getAttribute(QUERY_ATTR)
+        Assert.assertNotNull(attr)
+        Assert.assertEquals(attr, "100")
+        Assert.assertNotNull(query)
+        Assert.assertEquals(queryParam, query)
+    }
+
+    @Test
+    void testWithFlowfileInput() throws Exception {
+        String query = "{ \"query\": { \"match_all\": {} }}"
+        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+        initClient(runner)
+        runner.assertValid()
+        runner.enqueue(query)
+        runner.run()
+
+        postTest(runner, query)
+    }
+
+    @Test
+    void testWithQuery() throws Exception {
+        String query = "{\n" +
+            "\t\"query\": {\n" +
+            "\t\t\"match\": {\n" +
+            "\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
+            "\t\t}\n" +
+            "\t}\n" +
+            "}"
+        Map<String, String> attrs = new HashMap<String, String>(){{
+            put("field.name", "test_field")
+        }}
+        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+        initClient(runner)
+        runner.assertValid()
+        runner.enqueue("", attrs)
+        runner.run()
+
+        postTest(runner, query.replace('${field.name}', "test_field"))
+
+        runner.clearTransferState()
+
+        query = "{\n" +
+            "\t\"query\": {\n" +
+            "\t\t\"match\": {\n" +
+            "\t\t\t\"test_field.keyword\": \"test\"\n" +
+            "\t\t}\n" +
+            "\t}\n" +
+            "}"
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+        runner.setIncomingConnection(false)
+        runner.assertValid()
+        runner.run()
+        postTest(runner, query)
+    }
+
+    @Test
+    void testErrorAttribute() throws Exception {
+        String query = "{ \"query\": { \"match_all\": {} }}"
+        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+        initClient(runner)
+        client.setThrowErrorInDelete(true)
+        runner.assertValid()
+        runner.enqueue("")
+        runner.run()
+
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0)
+        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1)
+
+        MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0)
+        String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE)
+        Assert.assertNotNull(attr)
+    }
+
+    @Test
+    void testInputHandling() {
+        String query = "{ \"query\": { \"match_all\": {} }}"
+        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+        initClient(runner)
+        runner.assertValid()
+        runner.run()
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
similarity index 50%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
index cd7ac19..33a26de 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
@@ -15,54 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.processors.elasticsearch;
+package org.apache.nifi.processors.elasticsearch
 
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
 
-import java.util.List;
+import java.util.List
 
-public class JsonQueryElasticsearchTest {
-    private static final String INDEX_NAME = "messages";
+class JsonQueryElasticsearchTest {
+    private static final String INDEX_NAME = "messages"
 
-    public void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success);
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits);
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure);
-        runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations);
+    void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success)
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits)
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure)
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
     }
 
     @Test
-    public void testBasicQuery() throws Exception {
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
-        TestRunner runner = TestRunners.newTestRunner(processor);
-        TestElasticSearchClientService service = new TestElasticSearchClientService(false);
-        runner.addControllerService("esService", service);
-        runner.enableControllerService(service);
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
-        runner.setValidateExpressionUsage(true);
-        runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}");
-
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 1, 0, 0);
-
-        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES);
-        runner.clearProvenanceEvents();
-        runner.clearTransferState();
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 10, 0, 0);
+    void testBasicQuery() throws Exception {
+
+        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+        TestRunner runner = TestRunners.newTestRunner(processor)
+        TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+        runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 1, 0, 0)
+
+        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES)
+        runner.clearProvenanceEvents()
+        runner.clearTransferState()
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 10, 0, 0)
     }
 
     @Test
-    public void testAggregations() throws Exception {
+    void testAggregations() throws Exception {
         String query = "{\n" +
                 "\t\"query\": {\n" +
                 "\t\t\"match_all\": {}\n" +
@@ -79,42 +79,42 @@ public class JsonQueryElasticsearchTest {
                 "\t\t\t}\n" +
                 "\t\t}\n" +
                 "\t}\n" +
-                "}";
+                "}"
 
 
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
-        TestRunner runner = TestRunners.newTestRunner(processor);
-        TestElasticSearchClientService service = new TestElasticSearchClientService(true);
-        runner.addControllerService("esService", service);
-        runner.enableControllerService(service);
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
-        runner.setValidateExpressionUsage(true);
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query);
+        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+        TestRunner runner = TestRunners.newTestRunner(processor)
+        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
 
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 1, 0, 1);
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 1, 0, 1)
 
-        runner.clearTransferState();
+        runner.clearTransferState()
 
         //Test with the query parameter and no incoming connection
-        runner.setIncomingConnection(false);
-        runner.run(1, true, true);
-        testCounts(runner, 0, 1, 0, 1);
-        runner.setIncomingConnection(true);
+        runner.setIncomingConnection(false)
+        runner.run(1, true, true)
+        testCounts(runner, 0, 1, 0, 1)
+        runner.setIncomingConnection(true)
 
 
-        runner.clearTransferState();
-        runner.clearProvenanceEvents();
-        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES);
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 1, 0, 2);
+        runner.clearTransferState()
+        runner.clearProvenanceEvents()
+        runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES)
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 1, 0, 2)
 
-        runner.clearProvenanceEvents();
-        runner.clearTransferState();
+        runner.clearProvenanceEvents()
+        runner.clearTransferState()
 
         query = "{\n" +
                 "\t\"query\": {\n" +
@@ -123,30 +123,30 @@ public class JsonQueryElasticsearchTest {
                 "\t\"aggs\": {\n" +
                 "\t\t\"test_agg\": {\n" +
                 "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+                "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
                 "\t\t\t}\n" +
                 "\t\t},\n" +
                 "\t\t\"test_agg2\": {\n" +
                 "\t\t\t\"terms\": {\n" +
-                "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+                "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
                 "\t\t\t}\n" +
                 "\t\t}\n" +
                 "\t}\n" +
-                "}";
-        runner.setVariable("fieldValue", "msg");
-        runner.setVariable("es.index", INDEX_NAME);
-        runner.setVariable("es.type", "msg");
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query);
-        runner.setProperty(JsonQueryElasticsearch.INDEX, "${es.index}");
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "${es.type}");
-        runner.setValidateExpressionUsage(true);
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 1, 0, 2);
+                "}"
+        runner.setVariable("fieldValue", "msg")
+        runner.setVariable("es.index", INDEX_NAME)
+        runner.setVariable("es.type", "msg")
+        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+        runner.setProperty(JsonQueryElasticsearch.INDEX, "\${es.index}")
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "\${es.type}")
+        runner.setValidateExpressionUsage(true)
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 1, 0, 2)
     }
 
     @Test
-    public void testErrorDuringSearch() throws Exception {
+    void testErrorDuringSearch() throws Exception {
         String query = "{\n" +
                 "\t\"query\": {\n" +
                 "\t\t\"match_all\": {}\n" +
@@ -163,28 +163,28 @@ public class JsonQueryElasticsearchTest {
                 "\t\t\t}\n" +
                 "\t\t}\n" +
                 "\t}\n" +
-                "}";
-
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
-        TestRunner runner = TestRunners.newTestRunner(processor);
-        TestElasticSearchClientService service = new TestElasticSearchClientService(true);
-        service.setThrowErrorInSearch(true);
-        runner.addControllerService("esService", service);
-        runner.enableControllerService(service);
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
-        runner.setValidateExpressionUsage(true);
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query);
-
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 0, 0, 1, 0);
+                "}"
+
+
+        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+        TestRunner runner = TestRunners.newTestRunner(processor)
+        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+        service.setThrowErrorInSearch(true)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 0, 0, 1, 0)
     }
 
     @Test
-    public void testQueryAttribute() throws Exception {
+    void testQueryAttribute() throws Exception {
         final String query = "{\n" +
                 "\t\"query\": {\n" +
                 "\t\t\"match_all\": {}\n" +
@@ -201,32 +201,52 @@ public class JsonQueryElasticsearchTest {
                 "\t\t\t}\n" +
                 "\t\t}\n" +
                 "\t}\n" +
-                "}";
-        final String queryAttr = "es.query";
-
-
-        JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
-        TestRunner runner = TestRunners.newTestRunner(processor);
-        TestElasticSearchClientService service = new TestElasticSearchClientService(true);
-        runner.addControllerService("esService", service);
-        runner.enableControllerService(service);
-        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
-        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
-        runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
-        runner.setValidateExpressionUsage(true);
-        runner.setProperty(JsonQueryElasticsearch.QUERY, query);
-        runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr);
-
-        runner.enqueue("test");
-        runner.run(1, true, true);
-        testCounts(runner, 1, 1, 0, 1);
-        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS);
-        flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS));
+                "}"
+        final String queryAttr = "es.query"
+
+
+        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+        TestRunner runner = TestRunners.newTestRunner(processor)
+        TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+        runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+        runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
+
+        runner.enqueue("test")
+        runner.run(1, true, true)
+        testCounts(runner, 1, 1, 0, 1)
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS)
+        flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS))
 
         for (MockFlowFile mockFlowFile : flowFiles) {
-            String attr = mockFlowFile.getAttribute(queryAttr);
-            Assert.assertNotNull("Missing query attribute", attr);
-            Assert.assertEquals("Query had wrong value.", query, attr);
+            String attr = mockFlowFile.getAttribute(queryAttr)
+            Assert.assertNotNull("Missing query attribute", attr)
+            Assert.assertEquals("Query had wrong value.", query, attr)
         }
     }
+
+    @Test
+    void testInputHandling() {
+        JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+        TestRunner runner = TestRunners.newTestRunner(processor)
+        TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+        runner.addControllerService("esService", service)
+        runner.enableControllerService(service)
+        runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+        runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+        runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+        runner.setValidateExpressionUsage(true)
+        runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+        runner.run()
+
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_SUCCESS, 0)
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, 0)
+        runner.assertTransferCount(JsonQueryElasticsearch.REL_RETRY, 0)
+    }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
new file mode 100644
index 0000000..62f7c1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.avro.Schema
+import org.apache.nifi.avro.AvroTypeUtil
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.json.JsonRecordSetWriter
+import org.apache.nifi.json.JsonTreeReader
+import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+class PutElasticsearchRecordTest {
+    MockBulkLoadClientService clientService
+    MockSchemaRegistry registry
+    JsonTreeReader reader
+    TestRunner runner
+
+    static final String SCHEMA = prettyPrint(toJson([
+        name: "TestSchema",
+        type: "record",
+        fields: [
+            [ name: "msg", type: "string" ],
+            [ name: "from", type: "string" ]
+        ]
+    ]))
+
+    static final String flowFileContents = prettyPrint(toJson([
+            [ msg: "Hello, world", from: "john.smith" ],
+            [ msg: "Hi, back at ya!", from: "jane.doe" ]
+    ]))
+
+    @Before
+    void setup() {
+        clientService = new MockBulkLoadClientService()
+        registry = new MockSchemaRegistry()
+        reader   = new JsonTreeReader()
+        runner   = TestRunners.newTestRunner(PutElasticsearchRecord.class)
+
+        registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA)))
+
+        clientService.response = new IndexOperationResponse(1500)
+
+        runner.addControllerService("registry", registry)
+        runner.addControllerService("reader", reader)
+        runner.addControllerService("clientService", clientService)
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+        runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
+        runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
+        runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
+        runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
+        runner.enableControllerService(registry)
+        runner.enableControllerService(reader)
+        runner.enableControllerService(clientService)
+
+        runner.assertValid()
+    }
+
+    void basicTest(int failure, int retry, int success) {
+        runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
+    }
+
+    @Test
+    void simpleTest() {
+        basicTest(0, 0, 1)
+    }
+
+    @Test
+    void testFatalError() {
+        clientService.throwFatalError = true
+        basicTest(1, 0, 0)
+    }
+
+    @Test
+    void testRetriable() {
+        clientService.throwRetriableError = true
+        basicTest(0, 1, 0)
+    }
+
+    @Test
+    void testRecordPathFeatures() {
+        def newSchema = prettyPrint(toJson([
+            type: "record",
+            name: "RecordPathTestType",
+            fields: [
+                [ name: "id", type: "string" ],
+                [ name: "index", type: "string" ],
+                [ name: "type", type: "string" ],
+                [ name: "msg", type: "string"]
+            ]
+        ]))
+
+        def flowFileContents = prettyPrint(toJson([
+            [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+            [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+            [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+        ]))
+
+        def evalClosure = { List<IndexOperationRequest> items ->
+            def a = items.findAll { it.index == "bulk_a" }.size()
+            def b = items.findAll { it.index == "bulk_b" }.size()
+            items.each {
+                Assert.assertNotNull(it.id)
+                Assert.assertTrue(it.id.startsWith("rec-"))
+                Assert.assertEquals("message", it.type)
+            }
+            Assert.assertEquals(3, a)
+            Assert.assertEquals(3, b)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        registry.addSchema("recordPathTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
+        runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
+        runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
+        runner.enqueue(flowFileContents, [
+            "schema.name": "recordPathTest"
+        ])
+
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+        runner.clearTransferState()
+
+        flowFileContents = prettyPrint(toJson([
+            [ id: "rec-1", index: null, type: null, msg: "Hello" ],
+            [ id: "rec-2", index: null, type: null, msg: "Hello" ],
+            [ id: "rec-3", index: null, type: null, msg: "Hello" ],
+            [ id: "rec-4", index: null, type: null, msg: "Hello" ],
+            [ id: "rec-5", index: null, type: null, msg: "Hello" ],
+            [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+        ]))
+
+        evalClosure = { List<IndexOperationRequest> items ->
+            def testTypeCount = items.findAll { it.type == "test_type" }.size()
+            def messageTypeCount = items.findAll { it.type == "message" }.size()
+            def testIndexCount = items.findAll { it.index == "test_index" }.size()
+            def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
+            def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(5, testTypeCount)
+            Assert.assertEquals(1, messageTypeCount)
+            Assert.assertEquals(5, testIndexCount)
+            Assert.assertEquals(1, bulkIndexCount)
+            Assert.assertEquals(6, indexOperationCount)
+        }
+
+        clientService.evalClosure = evalClosure
+        runner.enqueue(flowFileContents, [
+            "schema.name": "recordPathTest"
+        ])
+        runner.run()
+
+        runner.clearTransferState()
+
+        flowFileContents = prettyPrint(toJson([
+            [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+            [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+            [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+            [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+        ]))
+
+        clientService.evalClosure = { List<IndexOperationRequest> items ->
+            int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(6, index)
+        }
+
+        runner.enqueue(flowFileContents, [
+            "schema.name": "recordPathTest"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+    }
+
+    @Test
+    void testInputRequired() {
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
+    }
+
+    @Test
+    void testErrorRelationship() {
+        def writer = new JsonRecordSetWriter()
+        runner.addControllerService("writer", writer)
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+        runner.enableControllerService(writer)
+        runner.setProperty(PutElasticsearchRecord.ERROR_RECORD_WRITER, "writer")
+
+        def newSchema = prettyPrint(toJson([
+            type: "record",
+            name: "RecordPathTestType",
+            fields: [
+                [ name: "id", type: "string" ],
+                [ name: "field1", type: ["null", "string"]],
+                [ name: "field2", type: "string"]
+            ]
+        ]))
+
+        def values = [
+            [ id: "1", field1: 'value1', field2: '20' ],
+            [ id: "2", field1: 'value1', field2: '20' ],
+            [ id: "2", field1: 'value1', field2: '20' ],
+            [ id: "3", field1: 'value1', field2: '20abcd' ]
+        ]
+
+        clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+        registry.addSchema("errorTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+        runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ])
+        runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
+
+        def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+        assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
similarity index 70%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index b1d4220..b1d559b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -15,82 +15,79 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.processors.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.IndexOperationRequest;
-import org.apache.nifi.elasticsearch.IndexOperationResponse;
-import org.apache.nifi.elasticsearch.SearchResponse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
-    private boolean returnAggs;
-    private boolean throwErrorInSearch;
-    private boolean throwErrorInDelete;
-
-    public TestElasticSearchClientService(boolean returnAggs) {
-        this.returnAggs = returnAggs;
+package org.apache.nifi.processors.elasticsearch
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
+
+class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
+    private boolean returnAggs
+    private boolean throwErrorInSearch
+    private boolean throwErrorInDelete
+
+    TestElasticsearchClientService(boolean returnAggs) {
+        this.returnAggs = returnAggs
     }
 
     @Override
-    public IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
-        return add(Arrays.asList(operation));
+    IndexOperationResponse add(IndexOperationRequest operation) {
+        return add(Arrays.asList(operation))
     }
 
     @Override
-    public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
-        return new IndexOperationResponse(100L, 100L);
+    IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
+        return new IndexOperationResponse(100L, 100L)
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
-        return deleteById(index, type, Arrays.asList(id));
+    Long count(String query, String index, String type) {
+        return null
     }
 
     @Override
-    public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+    DeleteOperationResponse deleteById(String index, String type, String id) {
+        return deleteById(index, type, Arrays.asList(id))
+    }
+
+    @Override
+    DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
         if (throwErrorInDelete) {
-            throw new IOException("Simulated IOException");
+            throw new IOException("Simulated IOException")
         }
-        return new DeleteOperationResponse(100L);
+        return new DeleteOperationResponse(100L)
     }
 
     @Override
-    public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
-        return deleteById(index, type, Arrays.asList("1"));
+    DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+        return deleteById(index, type, Arrays.asList("1"))
     }
 
     @Override
-    public Map<String, Object> get(String index, String type, String id) throws IOException {
-        return new HashMap<String, Object>(){{
-            put("msg", "one");
-        }};
+    Map<String, Object> get(String index, String type, String id) {
+        return [ "msg": "one" ]
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) throws IOException {
+    SearchResponse search(String query, String index, String type) {
         if (throwErrorInSearch) {
-            throw new IOException("Simulated IOException");
+            throw new IOException("Simulated IOException")
         }
 
-        ObjectMapper mapper = new ObjectMapper();
-        List<Map<String, Object>> hits = (List<Map<String, Object>>)mapper.readValue(HITS_RESULT, List.class);
-        Map<String, Object> aggs = returnAggs ? (Map<String, Object>)mapper.readValue(AGGS_RESULT, Map.class) :  null;
-        SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false);
-        return response;
+        def mapper = new JsonSlurper()
+        def hits = mapper.parseText(HITS_RESULT)
+        def aggs = returnAggs ?  mapper.parseText(AGGS_RESULT) :  null
+        SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false)
+        return response
     }
 
     @Override
-    public String getTransitUrl(String index, String type) {
-        return String.format("http://localhost:9400/%s/%s", index, type);
+    String getTransitUrl(String index, String type) {
+        "http://localhost:9400/${index}/${type}"
     }
 
     private static final String AGGS_RESULT = "{\n" +
@@ -146,7 +143,7 @@ public class TestElasticSearchClientService extends AbstractControllerService im
             "        }\n" +
             "      ]\n" +
             "    }\n" +
-            "  }";
+            "  }"
 
     private static final String HITS_RESULT = "[\n" +
             "      {\n" +
@@ -239,13 +236,13 @@ public class TestElasticSearchClientService extends AbstractControllerService im
             "          \"msg\": \"five\"\n" +
             "        }\n" +
             "      }\n" +
-            "    ]";
+            "    ]"
 
-    public void setThrowErrorInSearch(boolean throwErrorInSearch) {
-        this.throwErrorInSearch = throwErrorInSearch;
+    void setThrowErrorInSearch(boolean throwErrorInSearch) {
+        this.throwErrorInSearch = throwErrorInSearch
     }
 
-    public void setThrowErrorInDelete(boolean throwErrorInDelete) {
-        this.throwErrorInDelete = throwErrorInDelete;
+    void setThrowErrorInDelete(boolean throwErrorInDelete) {
+        this.throwErrorInDelete = throwErrorInDelete
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
similarity index 59%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
index 899cedb..fdbdac1 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
@@ -15,63 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.elasticsearch.integration
+package org.apache.nifi.processors.elasticsearch.mock
 
 import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.elasticsearch.DeleteOperationResponse
-import org.apache.nifi.elasticsearch.ElasticSearchClientService
-import org.apache.nifi.elasticsearch.IndexOperationRequest
-import org.apache.nifi.elasticsearch.IndexOperationResponse
-import org.apache.nifi.elasticsearch.SearchResponse
+import org.apache.nifi.elasticsearch.*
 
-class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
-    Map data = [
-        "username": "john.smith",
-        "password": "testing1234",
-        "email": "john.smith@test.com",
-        "position": "Software Engineer"
-    ]
+class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
+    boolean throwRetriableError
+    boolean throwFatalError
 
     @Override
-    IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
+    IndexOperationResponse add(IndexOperationRequest operation) {
         return null
     }
 
     @Override
-    IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
+    IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+    Long count(String query, String index, String type) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+    DeleteOperationResponse deleteById(String index, String type, String id) {
         return null
     }
 
     @Override
-    DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+    DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
         return null
     }
 
     @Override
-    Map<String, Object> get(String index, String type, String id) throws IOException {
-        return data
+    DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+        return null
     }
 
     @Override
-    SearchResponse search(String query, String index, String type) throws IOException {
-        List hits = [[
-            "_source": data
-        ]]
-        return new SearchResponse(hits, null, 1, 100, false)
+    Map<String, Object> get(String index, String type, String id) {
+        return null
+    }
+
+    @Override
+    SearchResponse search(String query, String index, String type) {
+        return null
     }
 
     @Override
     String getTransitUrl(String index, String type) {
-        return ""
+        return null
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
new file mode 100644
index 0000000..3c4d902
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.mock
+
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+
+class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
+    IndexOperationResponse response
+    Closure evalClosure
+
+    @Override
+    IndexOperationResponse bulk(List<IndexOperationRequest> items) {
+        if (throwRetriableError) {
+            throw new MockElasticsearchError(true)
+        } else if (throwFatalError) {
+            throw new MockElasticsearchError(false)
+        }
+
+        if (evalClosure) {
+            evalClosure.call(items)
+        }
+
+        response
+    }
+
+    static final SAMPLE_ERROR_RESPONSE = """
+{
+  "took" : 18,
+  "errors" : true,
+  "items" : [
+    {
+      "index" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "1",
+        "_version" : 4,
+        "result" : "updated",
+        "_shards" : {
+          "total" : 2,
+          "successful" : 1,
+          "failed" : 0
+        },
+        "_seq_no" : 4,
+        "_primary_term" : 1,
+        "status" : 200
+      }
+    },
+    {
+      "create" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "2",
+        "_version" : 1,
+        "result" : "created",
+        "_shards" : {
+          "total" : 2,
+          "successful" : 1,
+          "failed" : 0
+        },
+        "_seq_no" : 1,
+        "_primary_term" : 1,
+        "status" : 201
+      }
+    },
+    {
+      "create" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "3",
+        "_version" : 1,
+        "result" : "created",
+        "_shards" : {
+          "total" : 2,
+          "successful" : 1,
+          "failed" : 0
+        },
+        "_seq_no" : 3,
+        "_primary_term" : 1,
+        "status" : 201
+      }
+    },
+    {
+      "index" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "4",
+        "status" : 400,
+        "error" : {
+          "type" : "mapper_parsing_exception",
+          "reason" : "failed to parse field [field2] of type [integer] in document with id '4'",
+          "caused_by" : {
+            "type" : "number_format_exception",
+            "reason" : "For input string: 20abc"
+          }
+        }
+      }
+    }
+  ]
+}
+        """
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
similarity index 62%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
index a22b7aa..7b1073b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
@@ -15,22 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.mock
 
-public class IndexOperationResponse {
-    private long took;
-    private long ingestTook;
+import org.apache.nifi.elasticsearch.ElasticsearchError
 
-    public IndexOperationResponse(long took, long ingestTook) {
-        this.took = took;
-        this.ingestTook = ingestTook;
+class MockElasticsearchError extends ElasticsearchError {
+    MockElasticsearchError(boolean isElastic) {
+        this(new Exception())
+        this.isElastic = isElastic
     }
 
-    public long getTook() {
-        return took;
-    }
-
-    public long getIngestTook() {
-        return ingestTook;
+    MockElasticsearchError(Exception ex) {
+        super(ex)
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
new file mode 100644
index 0000000..74f8eb7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
@@ -0,0 +1 @@
+# Placeholder to force compilation
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
deleted file mode 100644
index ade1102..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DeleteByQueryElasticsearchTest {
-    private static final String INDEX = "test_idx";
-    private static final String TYPE  = "test_type";
-    private static final String QUERY_ATTR = "es.delete.query";
-    private static final String CLIENT_NAME = "clientService";
-
-    private TestElasticSearchClientService client;
-
-    private void initClient(TestRunner runner) throws Exception {
-        client = new TestElasticSearchClientService(true);
-        runner.addControllerService(CLIENT_NAME, client);
-        runner.enableControllerService(client);
-        runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME);
-    }
-
-    private void postTest(TestRunner runner, String queryParam) {
-        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0);
-        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1);
-
-        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS);
-        String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE);
-        String query = flowFiles.get(0).getAttribute(QUERY_ATTR);
-        Assert.assertNotNull(attr);
-        Assert.assertEquals(attr, "100");
-        Assert.assertNotNull(query);
-        Assert.assertEquals(queryParam, query);
-    }
-
-    @Test
-    public void testWithFlowfileInput() throws Exception {
-        String query = "{ \"query\": { \"match_all\": {} }}";
-        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
-        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
-        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
-        initClient(runner);
-        runner.assertValid();
-        runner.enqueue(query);
-        runner.run();
-
-        postTest(runner, query);
-    }
-
-    @Test
-    public void testWithQuery() throws Exception {
-        String query = "{\n" +
-            "\t\"query\": {\n" +
-            "\t\t\"match\": {\n" +
-            "\t\t\t\"${field.name}.keyword\": \"test\"\n" +
-            "\t\t}\n" +
-            "\t}\n" +
-            "}";
-        Map<String, String> attrs = new HashMap<String, String>(){{
-            put("field.name", "test_field");
-        }};
-        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
-        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
-        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
-        initClient(runner);
-        runner.assertValid();
-        runner.enqueue("", attrs);
-        runner.run();
-
-        postTest(runner, query.replace("${field.name}", "test_field"));
-
-        runner.clearTransferState();
-
-        query = "{\n" +
-            "\t\"query\": {\n" +
-            "\t\t\"match\": {\n" +
-            "\t\t\t\"test_field.keyword\": \"test\"\n" +
-            "\t\t}\n" +
-            "\t}\n" +
-            "}";
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
-        runner.setIncomingConnection(false);
-        runner.assertValid();
-        runner.run();
-        postTest(runner, query);
-    }
-
-    @Test
-    public void testErrorAttribute() throws Exception {
-        String query = "{ \"query\": { \"match_all\": {} }}";
-        TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
-        runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
-        runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
-        runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
-        initClient(runner);
-        client.setThrowErrorInDelete(true);
-        runner.assertValid();
-        runner.enqueue("");
-        runner.run();
-
-        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0);
-        runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1);
-
-        MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0);
-        String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE);
-        Assert.assertNotNull(attr);
-    }
-}