You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/12/05 20:39:55 UTC

[nifi] branch main updated: NIFI-9439: - Add PutElasticsearchJson processor to Elasticsearch REST bundle - Deprecate PutElasticsearchHttp/PutElasticsearchHttpReccord in favour of Elasticsearch REST processors

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 53809dd  NIFI-9439: - Add PutElasticsearchJson processor to Elasticsearch REST bundle - Deprecate PutElasticsearchHttp/PutElasticsearchHttpReccord in favour of Elasticsearch REST processors
53809dd is described below

commit 53809dd83f1b2e150aaf70a607f6885fa292cbdd
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Fri Dec 3 14:44:44 2021 +0000

    NIFI-9439:
    - Add PutElasticsearchJson processor to Elasticsearch REST bundle
    - Deprecate PutElasticsearchHttp/PutElasticsearchHttpReccord in favour of Elasticsearch REST processors
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5566.
---
 .../AbstractElasticsearchHttpProcessor.java        |   1 +
 .../elasticsearch/IdentifierNotFoundException.java |   1 +
 .../elasticsearch/PutElasticsearchHttp.java        |   5 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   5 +-
 .../elasticsearch/RetryableException.java          |   1 +
 .../elasticsearch/UnretryableException.java        |   2 +-
 .../elasticsearch/AbstractPutElasticsearch.java    | 165 +++++++++++++
 .../elasticsearch/ElasticsearchRestProcessor.java  |   4 -
 .../processors/elasticsearch/GetElasticsearch.java |   3 +-
 .../elasticsearch/PutElasticsearchJson.java        | 220 +++++++++++++++++
 .../elasticsearch/PutElasticsearchRecord.java      | 138 +++--------
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../additionalDetails.html                         |  44 ++++
 .../additionalDetails.html                         |  11 +-
 .../elasticsearch/PutElasticsearchJsonTest.groovy  | 272 +++++++++++++++++++++
 .../PutElasticsearchRecordTest.groovy              |  14 ++
 16 files changed, 770 insertions(+), 118 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 8609b0f..fe56e12 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -55,6 +55,7 @@ import javax.net.ssl.X509TrustManager;
 /**
  * A base class for Elasticsearch processors that use the HTTP API
  */
+@Deprecated
 public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
     static final String SOURCE_QUERY_PARAM = "_source";
     static final String QUERY_QUERY_PARAM = "q";
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
index 35402d7..eb61d3c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * A domain-specific exception for when a valid Elasticsearch document identifier is expected but not found
  */
+@Deprecated
 public class IdentifierNotFoundException extends Exception {
 
     public IdentifierNotFoundException() {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 5e9bdf3..ab54aa6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -61,7 +62,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
+@Deprecated
+@DeprecationNotice(classNames = {"org.apache.nifi.processors.elasticsearch.PutElasticsearchJson"},
+        reason = "This processor is deprecated and may be removed in future releases.")
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @EventDriven
 @SupportsBatching
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 6333be3..97e55ff 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
@@ -87,7 +88,9 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
-
+@Deprecated
+@DeprecationNotice(classNames = {"org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord"},
+        reason = "This processor is deprecated and may be removed in future releases.")
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @EventDriven
 @Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"})
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
index 8e94145..0ecec5d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * Represents a retryable exception from ElasticSearch.
  */
+@Deprecated
 public class RetryableException extends RuntimeException {
 
     private static final long serialVersionUID = -2755015600102381620L;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
index bae83cf..d6071f7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
@@ -19,8 +19,8 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * Represents an unrecoverable error from ElasticSearch.
  * @author jgresock
- *
  */
+@Deprecated
 public class UnretryableException extends RuntimeException {
     private static final long serialVersionUID = -4528006567211380914L;
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
new file mode 100644
index 0000000..2ede632
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("put-es-record-batch-size")
+            .displayName("Batch Size")
+            .description("The preferred number of FlowFiles to send over in a single batch.")
+            .defaultValue("100")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+        .name("put-es-record-index-op")
+        .displayName("Index Operation")
+        .description("The type of the operation used to index (create, delete, index, update, upsert)")
+        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue(IndexOperationRequest.Operation.Index.getValue())
+        .required(true)
+        .build();
+
+    static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
+            IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
+    ));
+
+    boolean logErrors;
+    ObjectMapper errorMapper;
+
+    volatile ElasticSearchClientService clientService;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+        this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+
+        if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) {
+            errorMapper = new ObjectMapper();
+            errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>();
+
+        final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
+        final ValidationResult.Builder indexOpValidationResult = new ValidationResult.Builder().subject(INDEX_OP.getName());
+        if (!indexOp.isExpressionLanguagePresent()) {
+            final String indexOpValue = indexOp.evaluateAttributeExpressions().getValue();
+            indexOpValidationResult.input(indexOpValue);
+            if (!ALLOWED_INDEX_OPERATIONS.contains(indexOpValue.toLowerCase())) {
+                indexOpValidationResult.valid(false)
+                        .explanation(String.format("%s must be Expression Language or one of %s",
+                                INDEX_OP.getDisplayName(), ALLOWED_INDEX_OPERATIONS)
+                        );
+            } else {
+                indexOpValidationResult.valid(true);
+            }
+        } else {
+            indexOpValidationResult.valid(true).input(indexOp.getValue()).explanation("Expression Language present");
+        }
+        validationResults.add(indexOpValidationResult.build());
+
+        return validationResults;
+    }
+
+    void transferFlowFilesOnException(final Exception ex, final Relationship rel, final ProcessSession session,
+                                      final boolean penalize, final FlowFile... flowFiles) {
+        for (FlowFile flowFile : flowFiles) {
+            flowFile = session.putAttribute(flowFile, "elasticsearch.put.error", ex.getMessage() == null ? "null" : ex.getMessage());
+            if (penalize) {
+                session.penalize(flowFile);
+            }
+            session.transfer(flowFile, rel);
+        }
+    }
+
+    void logElasticsearchDocumentErrors(final IndexOperationResponse response) throws JsonProcessingException {
+        if (logErrors || getLogger().isDebugEnabled()) {
+            final List<Map<String, Object>> errors = response.getItems();
+            final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors));
+
+            if (logErrors) {
+                getLogger().error(output);
+            } else {
+                getLogger().debug(output);
+            }
+        }
+    }
+
+    List<Integer> findElasticsearchErrorIndices(final IndexOperationResponse response) {
+        final List<Integer> indices = new ArrayList<>(response.getItems().size());
+        for (int index = 0; index < response.getItems().size(); index++) {
+            final Map<String, Object> current = response.getItems().get(index);
+            if (!current.isEmpty()) {
+                final String key = current.keySet().stream().findFirst().orElse(null);
+                @SuppressWarnings("unchecked") final Map<String, Object> inner = (Map<String, Object>) current.get(key);
+                if (inner != null && inner.containsKey("error")) {
+                    indices.add(index);
+                }
+            }
+        }
+        return indices;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index 886279e..58c447d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -104,10 +104,6 @@ public interface ElasticsearchRestProcessor {
             .name("success")
             .description("All flowfiles that succeed in being transferred into Elasticsearch go here.")
             .build();
-    Relationship REL_FAILED_RECORDS = new Relationship.Builder()
-            .name("errors").description("If an output record write is set, any record that failed to process the way it was " +
-                    "configured will be sent to this relationship as part of a failed record record set.")
-            .autoTerminateDefault(true).build();
 
     default String getQuery(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
         String retVal = null;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
index e416281..ae75d08 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -49,7 +49,8 @@ import java.util.concurrent.TimeUnit;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
-@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
+@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries " +
+        "to fetch a single document from Elasticsearch by _id. " +
         "Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
new file mode 100644
index 0000000..4614c05
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index"})
+@CapabilityDescription("An Elasticsearch put processor that uses the official Elastic REST client libraries.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
+                "These parameters will override any matching parameters in the _bulk request body. " +
+                "If FlowFiles are batched, only the first FlowFile in the batch is used to evaluate property values.")
+@SystemResourceConsideration(
+        resource = SystemResource.MEMORY,
+        description = "The Batch of FlowFiles will be stored in memory until the bulk operation is performed.")
+public class PutElasticsearchJson extends AbstractPutElasticsearch {
+    static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("put-es-json-id-attr")
+            .displayName("Identifier Attribute")
+            .description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", "
+                    + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+                    + "auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+        .name("put-es-json-charset")
+        .displayName("Character Set")
+        .description("Specifies the character set of the document data.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue(StandardCharsets.UTF_8.name())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor OUTPUT_ERROR_DOCUMENTS = new PropertyDescriptor.Builder()
+        .name("put-es-json-error-documents")
+        .displayName("Output Error Documents")
+        .description("If this configuration property is true, the response from Elasticsearch will be examined for failed documents " +
+                "and the failed documents will be sent to the \"errors\" relationship.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .required(true)
+        .build();
+
+    static final Relationship REL_FAILED_DOCUMENTS = new Relationship.Builder()
+            .name("errors").description("If \"" + OUTPUT_ERROR_DOCUMENTS.getDisplayName() + "\" is set, " +
+                    "any FlowFile that failed to process the way it was configured will be sent to this relationship " +
+                    "as part of a failed document set.")
+            .autoTerminateDefault(true).build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+        ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, CLIENT_SERVICE, LOG_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS
+    ));
+    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
+    )));
+
+    private boolean outputErrors;
+    private final ObjectMapper inputMapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        this.outputErrors = context.getProperty(OUTPUT_ERROR_DOCUMENTS).asBoolean();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final String idAttribute = context.getProperty(ID_ATTRIBUTE).getValue();
+
+        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
+        final List<IndexOperationRequest> operations = new ArrayList<>(flowFiles.size());
+
+        for (FlowFile input : flowFiles) {
+            final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
+            final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+            final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+            final String id = StringUtils.isNotBlank(idAttribute) ? input.getAttribute(idAttribute) : null;
+
+            final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
+
+            try (final InputStream inStream = session.read(input)) {
+                final byte[] result = IOUtils.toByteArray(inStream);
+                @SuppressWarnings("unchecked")
+                final Map<String, Object> contentMap = inputMapper.readValue(new String(result, charset), Map.class);
+
+                final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(indexOp);
+                operations.add(new IndexOperationRequest(index, type, id, contentMap, o));
+
+                originals.add(input);
+            } catch (final IOException ioe) {
+                getLogger().error("Could not read FlowFile content valid JSON.", ioe);
+                input = session.putAttribute(input, "elasticsearch.put.error", ioe.getMessage());
+                session.penalize(input);
+                session.transfer(input, REL_FAILURE);
+            } catch (final Exception ex) {
+                getLogger().error("Could not index documents.", ex);
+                input = session.putAttribute(input, "elasticsearch.put.error", ex.getMessage());
+                session.penalize(input);
+                session.transfer(input, REL_FAILURE);
+            }
+        }
+
+        if (!originals.isEmpty()) {
+            try {
+                final List<FlowFile> errorDocuments = indexDocuments(operations, originals, context);
+                session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
+
+                session.transfer(originals.stream().filter(f -> !errorDocuments.contains(f)).collect(Collectors.toList()), REL_SUCCESS);
+            } catch (final ElasticsearchException ese) {
+                final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to failure");
+                getLogger().error(msg, ese);
+                final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
+                transferFlowFilesOnException(ese, rel, session, true, originals.toArray(new FlowFile[0]));
+            } catch (final JsonProcessingException jpe) {
+                getLogger().warn("Could not log Elasticsearch operation errors nor determine which documents errored.", jpe);
+                final Relationship rel = outputErrors ? REL_FAILED_DOCUMENTS : REL_FAILURE;
+                transferFlowFilesOnException(jpe, rel, session, true, originals.toArray(new FlowFile[0]));
+            } catch (final Exception ex) {
+                getLogger().error("Could not index documents.", ex);
+                transferFlowFilesOnException(ex, REL_FAILURE, session, false, originals.toArray(new FlowFile[0]));
+                context.yield();
+            }
+        } else {
+            getLogger().warn("No FlowFiles successfully parsed for sending to Elasticsearch");
+        }
+    }
+
+    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> operations, final List<FlowFile> originals, final ProcessContext context) throws JsonProcessingException {
+        final IndexOperationResponse response = clientService.bulk(operations, getUrlQueryParameters(context, originals.get(0)));
+        final List<FlowFile> errorDocuments = new ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
+        if (response.hasErrors()) {
+            logElasticsearchDocumentErrors(response);
+
+            if (outputErrors) {
+                findElasticsearchErrorIndices(response).forEach(index -> errorDocuments.add(originals.get(index)));
+            }
+        }
+        return errorDocuments;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index fe058c0..0a1fd31 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -17,27 +17,22 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -49,6 +44,7 @@ import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -62,11 +58,11 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -86,7 +82,10 @@ import java.util.Set;
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
         description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
                 "These parameters will override any matching parameters in the _bulk request body")
-public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
+@SystemResourceConsideration(
+        resource = SystemResource.MEMORY,
+        description = "The Batch of Records will be stored in memory until the bulk operation is performed.")
+public class PutElasticsearchRecord extends AbstractPutElasticsearch {
     static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
         .name("put-es-record-reader")
         .displayName("Record Reader")
@@ -96,23 +95,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .build();
 
     static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-        .name("put-es-record-batch-size")
-        .displayName("Batch Size")
+        .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
         .description("The number of records to send over in a single batch.")
-        .defaultValue("100")
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .required(true)
-        .build();
-
-    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
-        .name("put-es-record-index-op")
-        .displayName("Index Operation")
-        .description("The type of the operation used to index (create, delete, index, update, upsert)")
-        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .defaultValue(IndexOperationRequest.Operation.Index.getValue())
-        .required(true)
         .build();
 
     static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
@@ -245,6 +230,11 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         .required(false)
         .build();
 
+    static final Relationship REL_FAILED_RECORDS = new Relationship.Builder()
+            .name("errors").description("If an Error Record Writer is set, any record that failed to process the way it was " +
+                    "configured will be sent to this relationship as part of a failed record set.")
+            .autoTerminateDefault(true).build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
@@ -254,21 +244,10 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
     )));
 
-    static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
-            IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
-    ));
-
     private RecordPathCache recordPathCache;
     private RecordReaderFactory readerFactory;
     private RecordSetWriterFactory writerFactory;
-    private boolean logErrors;
-    private ObjectMapper errorMapper;
 
-    private volatile ElasticSearchClientService clientService;
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
@@ -283,24 +262,13 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         return DESCRIPTORS;
     }
 
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .required(false)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .dynamic(true)
-                .build();
-    }
-
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
         this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
         this.recordPathCache = new RecordPathCache(16);
         this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
 
         this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.dateFormat == null) {
@@ -314,36 +282,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         if (this.timestampFormat == null) {
             this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
-
-        if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) {
-            errorMapper = new ObjectMapper();
-            errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
-        }
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> validationResults = new ArrayList<>();
-
-        final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
-        final ValidationResult.Builder indexOpValidationResult = new ValidationResult.Builder().subject(INDEX_OP.getName());
-        if (!indexOp.isExpressionLanguagePresent()) {
-            final String indexOpValue = indexOp.evaluateAttributeExpressions().getValue();
-            indexOpValidationResult.input(indexOpValue);
-            if (!ALLOWED_INDEX_OPERATIONS.contains(indexOpValue.toLowerCase())) {
-                indexOpValidationResult.valid(false)
-                        .explanation(String.format("%s must be Expression Language or one of %s",
-                                INDEX_OP.getDisplayName(), ALLOWED_INDEX_OPERATIONS)
-                        );
-            } else {
-                indexOpValidationResult.valid(true);
-            }
-        } else {
-            indexOpValidationResult.valid(true).input(indexOp.getValue()).explanation("Expression Language present");
-        }
-        validationResults.add(indexOpValidationResult.build());
-
-        return validationResults;
     }
 
     @Override
@@ -423,15 +361,18 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                     ese.isElastic() ? "Routing to retry." : "Routing to failure");
             getLogger().error(msg, ese);
             final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
-            session.penalize(input);
-            input = session.putAttribute(input, "elasticsearch.put.error", ese.getMessage());
-            session.transfer(input, rel);
+            transferFlowFilesOnException(ese, rel, session, true, input);
+            removeBadRecordFlowFiles(badRecords, session);
+            return;
+        } catch (final IOException | SchemaNotFoundException ex) {
+            getLogger().warn("Could not log Elasticsearch operation errors nor determine which documents errored.", ex);
+            final Relationship rel = writerFactory != null ? REL_FAILED_RECORDS : REL_FAILURE;
+            transferFlowFilesOnException(ex, rel, session, true, input);
             removeBadRecordFlowFiles(badRecords, session);
             return;
         } catch (final Exception ex) {
             getLogger().error("Could not index documents.", ex);
-            input = session.putAttribute(input, "elasticsearch.put.error", ex.getMessage());
-            session.transfer(input, REL_FAILURE);
+            transferFlowFilesOnException(ex, REL_FAILURE, session, false, input);
             context.yield();
             removeBadRecordFlowFiles(badRecords, session);
             return;
@@ -447,19 +388,10 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
         bad.clear();
     }
 
-    private FlowFile indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws Exception {
+    private FlowFile indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws IOException, SchemaNotFoundException {
         final IndexOperationResponse response = clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, input));
         if (response.hasErrors()) {
-            if (logErrors || getLogger().isDebugEnabled()) {
-                final List<Map<String, Object>> errors = response.getItems();
-                final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors));
-
-                if (logErrors) {
-                    getLogger().error(output);
-                } else {
-                    getLogger().debug(output);
-                }
-            }
+            logElasticsearchDocumentErrors(response);
 
             if (writerFactory != null) {
                 FlowFile errorFF = session.create(input);
@@ -469,17 +401,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                          final RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
 
                         writer.beginRecordSet();
-                        for (int index = 0; index < response.getItems().size(); index++) {
-                            final Map<String, Object> current = response.getItems().get(index);
-                            if (!current.isEmpty()) {
-                                final String key = current.keySet().stream().findFirst().orElse(null);
-                                @SuppressWarnings("unchecked")
-                                final Map<String, Object> inner = (Map<String, Object>) current.get(key);
-                                if (inner != null && inner.containsKey("error")) {
-                                    writer.write(bundle.getOriginalRecords().get(index));
-                                    added++;
-                                }
-                            }
+                        for (final int index : findElasticsearchErrorIndices(response)) {
+                            writer.write(bundle.getOriginalRecords().get(index));
+                            added++;
                         }
                         writer.finishRecordSet();
                     }
@@ -489,8 +413,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
                     session.transfer(errorFF, REL_FAILED_RECORDS);
 
                     return errorFF;
-                } catch (final Exception ex) {
-                    getLogger().error("", ex);
+                } catch (final IOException | SchemaNotFoundException ex) {
+                    getLogger().error("Unable to write error records", ex);
                     session.remove(errorFF);
                     throw ex;
                 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 69bbf40..a22d16a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,5 +18,7 @@ org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.SearchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
+org.apache.nifi.processors.elasticsearch.PutElasticsearchJson
 org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.GetElasticsearch
+
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
new file mode 100644
index 0000000..403cb7d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
@@ -0,0 +1,44 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutElasticsearchJson</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+<p>
+    This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
+    a per-FlowFile basis, which is what separates it from PutElasticsearchRecord.
+</p>
+<p>
+    As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and
+    that controller service is built on top of the official Elasticsearch client APIs. That provides features such as
+    automatic master detection against the cluster which is missing in the other bundles.
+</p>
+<p>
+    This processor builds one Elasticsearch Bulk API body per (batch of) FlowFiles. Care should be taken to batch FlowFiles
+    into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are
+    not too large for it to handle. When failures do occur, this processor is capable of attempting to route the FlowFiles
+    that failed to an errors queue so that only failed FlowFiles can be processed downstream or replayed.
+</p>
+<p>
+    The index, operation and (optional) type fields are configured with default values.
+    The ID (optional unless the operation is "index") can be set as an attribute on the FlowFile(s).
+    The following is an example of a document exercising all of these features:
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
index 76e862e..18da9cb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -22,7 +22,7 @@
 <body>
 <p>
     This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
-    a per-record basis which is what separates it from PutElasticsearchHttpRecord. For example, it is possible to define
+    a per-record basis which is what separates it from PutElasticsearchJson. For example, it is possible to define
     multiple commands to index documents, followed by deletes, creates and update operations against the same index or
     other indices as desired.
 </p>
@@ -41,7 +41,10 @@
     The index, operation and (optional) type fields are configured with default values that can be overridden using
     record path operations that find an index or type value in the record set.
     The ID and operation type (create, index, update, upsert or delete) can also be extracted in a similar fashion from
-    the record set. The following is an example of a document exercising all of these features:
+    the record set.
+    An "@timestamp" field can be added to the data either using a default or by extracting it from the record set.
+    This is useful if the documents are being indexed into an Elasticsearch Data Stream.
+    The following is an example of a document exercising all of these features:
 </p>
 <pre>
     {
@@ -52,7 +55,8 @@
             "operation": "index"
         },
         "message": "Hello, world",
-        "from": "john.smith"
+        "from": "john.smith",
+        "ts": "2021-12-03'T'14:00:00.000Z"
     }
 </pre>
 <pre>
@@ -71,6 +75,7 @@
     <li>/metadata/index</li>
     <li>metadata/type</li>
     <li>metadata/operation</li>
+    <li>/ts</li>
 </ul>
 <p>Valid values for "operation" are:</p>
 <ul>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
new file mode 100644
index 0000000..ab979fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+import static org.hamcrest.CoreMatchers.containsString
+import static org.hamcrest.MatcherAssert.assertThat
+
+class PutElasticsearchJsonTest {
+    MockBulkLoadClientService clientService
+    TestRunner runner
+
+    static final String flowFileContents = prettyPrint(toJson(
+            [ msg: "Hello, world", from: "john.smith" ]
+    ))
+
+    @Before
+    void setup() {
+        clientService = new MockBulkLoadClientService()
+        runner   = TestRunners.newTestRunner(PutElasticsearchJson.class)
+
+        clientService.response = new IndexOperationResponse(1500)
+
+        runner.addControllerService("clientService", clientService)
+        runner.setProperty(PutElasticsearchJson.ID_ATTRIBUTE, "doc_id")
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Index.getValue())
+        runner.setProperty(PutElasticsearchJson.INDEX, "test_index")
+        runner.setProperty(PutElasticsearchJson.TYPE, "test_type")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "1")
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "false")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
+        runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, "clientService")
+        runner.enableControllerService(clientService)
+
+        runner.assertValid()
+    }
+
+    void basicTest(int failure, int retry, int success) {
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int nullIdCount = items.findAll { it.id == null }.size()
+            int indexCount = items.findAll { it.index == "test_index" }.size()
+            int typeCount = items.findAll { it.type == "test_type" }.size()
+            int opCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(1, nullIdCount)
+            Assert.assertEquals(1, indexCount)
+            Assert.assertEquals(1, typeCount)
+            Assert.assertEquals(1, opCount)
+        }
+
+        basicTest(failure, retry, success, evalClosure)
+    }
+
+    void basicTest(int failure, int retry, int success, Closure evalClosure) {
+        clientService.evalClosure = evalClosure
+
+        basicTest(failure, retry, success, null)
+    }
+
+    void basicTest(int failure, int retry, int success, Map<String, String> attr) {
+        if (attr != null) {
+            runner.enqueue(flowFileContents, attr)
+        } else {
+            runner.enqueue(flowFileContents)
+        }
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, failure)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry)
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
+    }
+
+    @Test
+    void simpleTest() {
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertTrue(params.isEmpty())
+        }
+        clientService.evalParametersClosure = evalParametersClosure
+
+        basicTest(0, 0, 1)
+    }
+
+    @Test
+    void simpleTestWithDocIdAndRequestParameters() {
+        runner.setProperty("refresh", "true")
+        runner.setProperty("slices", '${slices}')
+        runner.setVariable("slices", "auto")
+        runner.assertValid()
+
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertEquals(2, params.size())
+            Assert.assertEquals("true", params.get("refresh"))
+            Assert.assertEquals("auto", params.get("slices"))
+        }
+
+        clientService.evalParametersClosure = evalParametersClosure
+
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int idCount = items.findAll { it.id == "123" }.size()
+            int indexCount = items.findAll { it.index == "test_index" }.size()
+            int typeCount = items.findAll { it.type == "test_type" }.size()
+            int opCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(1, idCount)
+            Assert.assertEquals(1, indexCount)
+            Assert.assertEquals(1, typeCount)
+            Assert.assertEquals(1, opCount)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        basicTest(0, 0, 1, [doc_id: "123"])
+    }
+
+    @Test
+    void simpleTestWithRequestParametersFlowFileEL() {
+        runner.setProperty("refresh", "true")
+        runner.setProperty("slices", '${slices}')
+        runner.assertValid()
+
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertEquals(2, params.size())
+            Assert.assertEquals("true", params.get("refresh"))
+            Assert.assertEquals("auto", params.get("slices"))
+        }
+
+        clientService.evalParametersClosure = evalParametersClosure
+
+        basicTest(0, 0, 1, [slices: "auto"])
+    }
+
+    @Test
+    void testFatalError() {
+        clientService.throwFatalError = true
+        basicTest(1, 0, 0)
+    }
+
+    @Test
+    void testRetriable() {
+        clientService.throwRetriableError = true
+        basicTest(0, 1, 0)
+    }
+
+    @Test
+    void testInvalidIndexOperation() {
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, "not-valid")
+        runner.assertNotValid()
+        final AssertionError ae = Assert.assertThrows(AssertionError.class, runner.&run)
+        Assert.assertEquals(String.format("Processor has 1 validation failures:\n'%s' validated against 'not-valid' is invalid because %s must be Expression Language or one of %s\n",
+                PutElasticsearchJson.INDEX_OP.getName(), PutElasticsearchJson.INDEX_OP.getDisplayName(), PutElasticsearchJson.ALLOWED_INDEX_OPERATIONS),
+                ae.getMessage()
+        )
+
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, "\${operation}")
+        runner.assertValid()
+        runner.enqueue(flowFileContents, [
+                "operation": "not-valid2"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
+    }
+
+    @Test
+    void testInputRequired() {
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
+    }
+
+    @Test
+    void testBatchingAndErrorRelationship() {
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "true")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "true")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100")
+
+        clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+        def values = [
+                [ id: "1", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "3", field1: 'value1', field2: '20abcd' ]
+        ]
+
+        for (final def val : values) {
+            runner.enqueue(prettyPrint(toJson(val)))
+        }
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 3)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 1)
+        assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(), containsString("20abcd"))
+    }
+
+    @Test
+    void testBatchingAndNoErrorOutput() {
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "false")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100")
+
+        clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+        def values = [
+                [ id: "1", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "3", field1: 'value1', field2: '20abcd' ]
+        ]
+
+        for (final def val : values) {
+            runner.enqueue(prettyPrint(toJson(val)))
+        }
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
+    }
+
+    @Test
+    void testInvalidInput() {
+        runner.enqueue("not-json")
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 0)
+
+        runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals(
+                "elasticsearch.put.error",
+                "Unrecognized token 'not': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" +
+                " at [Source: (String)\"not-json\"; line: 1, column: 4]"
+        )
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 2bbd51c..5a2c32d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -135,6 +135,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
     }
 
     @Test
@@ -291,6 +292,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
 
         runner.clearTransferState()
 
@@ -344,6 +346,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
 
         runner.clearTransferState()
 
@@ -384,6 +387,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
 
         runner.clearTransferState()
 
@@ -421,6 +425,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
 
         runner.clearTransferState()
 
@@ -441,6 +446,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
     }
 
     @Test
@@ -515,6 +521,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
 
         runner.clearTransferState()
 
@@ -567,6 +574,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
     }
 
     @Test
@@ -588,12 +596,16 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
     }
 
     @Test
     void testInputRequired() {
         runner.run()
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
     }
 
     @Test
@@ -631,6 +643,8 @@ class PutElasticsearchRecordTest {
         runner.run()
 
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
 
         def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]