You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2017/06/02 17:28:37 UTC

nifi git commit: NIFI-4002: Add PutElasticsearchHttpRecord processor

Repository: nifi
Updated Branches:
  refs/heads/master de6a98618 -> 0bddcfe73


NIFI-4002: Add PutElasticsearchHttpRecord processor

This closes #1878

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0bddcfe7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0bddcfe7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0bddcfe7

Branch: refs/heads/master
Commit: 0bddcfe730841ab838a9539421d4aaf15fa57a49
Parents: de6a986
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Jun 1 12:28:28 2017 -0400
Committer: Bryan Rosander <br...@apache.org>
Committed: Fri Jun 2 13:01:13 2017 -0400

----------------------------------------------------------------------
 .../nifi-elasticsearch-processors/pom.xml       |  17 +
 .../IdentifierNotFoundException.java            |  42 ++
 .../PutElasticsearchHttpRecord.java             | 565 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../TestPutElasticsearchHttpRecord.java         | 480 ++++++++++++++++
 5 files changed, 1105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0bddcfe7/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index 08a2b7b..9949524 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -42,6 +42,18 @@ language governing permissions and limitations under the License. -->
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
@@ -56,6 +68,11 @@ language governing permissions and limitations under the License. -->
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>${es.version}</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bddcfe7/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
new file mode 100644
index 0000000..35402d7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+/**
+ * A domain-specific exception for when a valid Elasticsearch document identifier is expected but not found
+ */
+public class IdentifierNotFoundException extends Exception {
+
+    public IdentifierNotFoundException() {
+    }
+
+    public IdentifierNotFoundException(String message) {
+        super(message);
+    }
+
+    public IdentifierNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public IdentifierNotFoundException(Throwable cause) {
+        super(cause);
+    }
+
+    public IdentifierNotFoundException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bddcfe7/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
new file mode 100644
index 0000000..24d0057
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"})
+@CapabilityDescription("Writes the records from a FlowFile into to Elasticsearch, using the specified parameters such as "
+        + "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to "
+        + "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document "
+        + "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
+public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("put-es-record-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
+            .name("put-es-record-id-path")
+            .displayName("Identifier Record Path")
+            .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", "
+                    + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+                    + "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.")
+            .required(false)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("put-es-record-index")
+            .displayName("Index")
+            .description("The name of the index to insert into")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+                    AttributeExpression.ResultType.STRING, true))
+            .build();
+
+    static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("put-es-record-type")
+            .displayName("Type")
+            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+            .name("put-es-record-index-op")
+            .displayName("Index Operation")
+            .description("The type of the operation used to index (index, update, upsert, delete)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("index")
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private volatile RecordPathCache recordPathCache;
+
+    private final JsonFactory factory = new JsonFactory();
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ES_URL);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(RESPONSE_TIMEOUT);
+        descriptors.add(RECORD_READER);
+        descriptors.add(ID_RECORD_PATH);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(INDEX_OP);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+        // Since Expression Language is allowed for index operation, we can't guarantee that we can catch
+        // all invalid configurations, but we should catch them as soon as we can. For example, if the
+        // Identifier Record Path property is empty, the Index Operation must evaluate to "index".
+        String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
+        String indexOp = validationContext.getProperty(INDEX_OP).getValue();
+
+        if (StringUtils.isEmpty(idPath)) {
+            switch (indexOp.toLowerCase()) {
+                case "update":
+                case "upsert":
+                case "delete":
+                case "":
+                    problems.add(new ValidationResult.Builder()
+                            .valid(false)
+                            .subject(INDEX_OP.getDisplayName())
+                            .explanation("If Identifier Record Path is not set, Index Operation must evaluate to \"index\"")
+                            .build());
+                    break;
+                default:
+                    break;
+            }
+        }
+        return problems;
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        super.setup(context);
+        recordPathCache = new RecordPathCache(10);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+        // Authentication
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+
+        OkHttpClient okHttpClient = getClient();
+        final ComponentLog logger = getLogger();
+
+        final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
+        final URL url;
+        try {
+            url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
+        } catch (MalformedURLException mue) {
+            // Since we have a URL validator, something has gone very wrong, throw a ProcessException
+            context.yield();
+            throw new ProcessException(mue);
+        }
+
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isEmpty(index)) {
+            logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+        String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isEmpty(indexOp)) {
+            logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        switch (indexOp.toLowerCase()) {
+            case "index":
+            case "update":
+            case "upsert":
+            case "delete":
+                break;
+            default:
+                logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+        }
+
+        final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
+        final StringBuilder sb = new StringBuilder();
+
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+
+                final String id;
+                if (recordPath != null) {
+                    Optional<FieldValue> idPathValue = recordPath.evaluate(record).getSelectedFields().findFirst();
+                    if (!idPathValue.isPresent() || idPathValue.get().getValue() == null) {
+                        throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
+                    }
+                    id = idPathValue.get().getValue().toString();
+                } else {
+                    id = null;
+                }
+
+                // The ID must be valid for all operations except "index". For that case,
+                // a missing ID indicates one is to be auto-generated by Elasticsearch
+                if (id == null && !indexOp.equalsIgnoreCase("index")) {
+                    throw new IdentifierNotFoundException("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.");
+                }
+
+                final StringBuilder json = new StringBuilder();
+
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                JsonGenerator generator = factory.createJsonGenerator(out);
+                writeRecord(record, record.getSchema(), generator);
+                generator.flush();
+                generator.close();
+                json.append(out.toString());
+
+                if (indexOp.equalsIgnoreCase("index")) {
+                    sb.append("{\"index\": { \"_index\": \"");
+                    sb.append(index);
+                    sb.append("\", \"_type\": \"");
+                    sb.append(docType);
+                    sb.append("\"");
+                    if (!StringUtils.isEmpty(id)) {
+                        sb.append(", \"_id\": \"");
+                        sb.append(id);
+                        sb.append("\"");
+                    }
+                    sb.append("}}\n");
+                    sb.append(json);
+                    sb.append("\n");
+                } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
+                    sb.append("{\"update\": { \"_index\": \"");
+                    sb.append(index);
+                    sb.append("\", \"_type\": \"");
+                    sb.append(docType);
+                    sb.append("\", \"_id\": \"");
+                    sb.append(id);
+                    sb.append("\" }\n");
+                    sb.append("{\"doc\": ");
+                    sb.append(json);
+                    sb.append(", \"doc_as_upsert\": ");
+                    sb.append(indexOp.equalsIgnoreCase("upsert"));
+                    sb.append(" }\n");
+                } else if (indexOp.equalsIgnoreCase("delete")) {
+                    sb.append("{\"delete\": { \"_index\": \"");
+                    sb.append(index);
+                    sb.append("\", \"_type\": \"");
+                    sb.append(docType);
+                    sb.append("\", \"_id\": \"");
+                    sb.append(id);
+                    sb.append("\" }\n");
+                }
+            }
+        } catch (IdentifierNotFoundException infe) {
+            logger.error(infe.getMessage(), new Object[]{flowFile});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+
+        } catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
+            logger.error("Could not parse incoming data", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
+        final Response getResponse;
+        try {
+            getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
+        } catch (final Exception e) {
+            logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+        final int statusCode = getResponse.code();
+
+        if (isSuccess(statusCode)) {
+            ResponseBody responseBody = getResponse.body();
+            try {
+                final byte[] bodyBytes = responseBody.bytes();
+
+                JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
+                boolean errors = responseJson.get("errors").asBoolean(false);
+                // ES has no rollback, so if errors occur, log them and route the whole flow file to failure
+                if (errors) {
+                    ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
+                    if (itemNodeArray.size() > 0) {
+                        // All items are returned whether they succeeded or failed, so iterate through the item array
+                        // at the same time as the flow file list, logging failures accordingly
+                        for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
+                            JsonNode itemNode = itemNodeArray.get(i);
+                            int status = itemNode.findPath("status").asInt();
+                            if (!isSuccess(status)) {
+                                String reason = itemNode.findPath("//error/reason").asText();
+                                logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
+                                        new Object[]{flowFile, reason});
+                            }
+                        }
+                    }
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    session.transfer(flowFile, REL_SUCCESS);
+                    session.getProvenanceReporter().send(flowFile, url.toString());
+                }
+
+            } catch (IOException ioe) {
+                // Something went wrong when parsing the response, log the error and route to failure
+                logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
+                session.transfer(flowFile, REL_FAILURE);
+                context.yield();
+            }
+        } else if (statusCode / 100 == 5) {
+            // 5xx -> RETRY, but a server error might last a while, so yield
+            logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
+                    new Object[]{statusCode, getResponse.message()});
+            session.transfer(flowFile, REL_RETRY);
+            context.yield();
+        } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
+            logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+        getResponse.close();
+    }
+
+    private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
+            throws IOException {
+        RecordSchema schema = record.getSchema();
+
+        generator.writeStartObject();
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            final RecordField field = schema.getField(i);
+            final String fieldName = field.getFieldName();
+            final Object value = record.getValue(field);
+            if (value == null) {
+                generator.writeNullField(fieldName);
+                continue;
+            }
+
+            generator.writeFieldName(fieldName);
+            final DataType dataType = schema.getDataType(fieldName).get();
+
+            writeValue(generator, value, fieldName, dataType);
+        }
+        generator.writeEndObject();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
+        if (value == null) {
+            generator.writeNull();
+            return;
+        }
+
+        final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
+        final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName);
+        if (coercedValue == null) {
+            generator.writeNull();
+            return;
+        }
+
+        switch (chosenDataType.getFieldType()) {
+            case DATE: {
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
+                } else {
+                    generator.writeString(stringValue);
+                }
+                break;
+            }
+            case TIME: {
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
+                } else {
+                    generator.writeString(stringValue);
+                }
+                break;
+            }
+            case TIMESTAMP: {
+                final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
+                    generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
+                } else {
+                    generator.writeString(stringValue);
+                }
+                break;
+            }
+            case DOUBLE:
+                generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
+                break;
+            case FLOAT:
+                generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName));
+                break;
+            case LONG:
+                generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
+                break;
+            case INT:
+            case BYTE:
+            case SHORT:
+                generator.writeNumber(DataTypeUtils.toInteger(coercedValue, fieldName));
+                break;
+            case CHAR:
+            case STRING:
+                generator.writeString(coercedValue.toString());
+                break;
+            case BIGINT:
+                if (coercedValue instanceof Long) {
+                    generator.writeNumber((Long) coercedValue);
+                } else {
+                    generator.writeNumber((BigInteger) coercedValue);
+                }
+                break;
+            case BOOLEAN:
+                final String stringValue = coercedValue.toString();
+                if ("true".equalsIgnoreCase(stringValue)) {
+                    generator.writeBoolean(true);
+                } else if ("false".equalsIgnoreCase(stringValue)) {
+                    generator.writeBoolean(false);
+                } else {
+                    generator.writeString(stringValue);
+                }
+                break;
+            case RECORD: {
+                final Record record = (Record) coercedValue;
+                final RecordDataType recordDataType = (RecordDataType) chosenDataType;
+                final RecordSchema childSchema = recordDataType.getChildSchema();
+                writeRecord(record, childSchema, generator);
+                break;
+            }
+            case MAP: {
+                final MapDataType mapDataType = (MapDataType) chosenDataType;
+                final DataType valueDataType = mapDataType.getValueType();
+                final Map<String, ?> map = (Map<String, ?>) coercedValue;
+                generator.writeStartObject();
+                for (final Map.Entry<String, ?> entry : map.entrySet()) {
+                    final String mapKey = entry.getKey();
+                    final Object mapValue = entry.getValue();
+                    generator.writeFieldName(mapKey);
+                    writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType);
+                }
+                generator.writeEndObject();
+                break;
+            }
+            case ARRAY:
+            default:
+                if (coercedValue instanceof Object[]) {
+                    final Object[] values = (Object[]) coercedValue;
+                    final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                    final DataType elementType = arrayDataType.getElementType();
+                    writeArray(values, fieldName, generator, elementType);
+                } else {
+                    generator.writeString(coercedValue.toString());
+                }
+                break;
+        }
+    }
+
+    private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType) throws IOException {
+        generator.writeStartArray();
+        for (final Object element : values) {
+            writeValue(generator, element, fieldName, elementType);
+        }
+        generator.writeEndArray();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bddcfe7/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a6cd087..11a66e6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,5 +16,6 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearch
 org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
 org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp
+org.apache.nifi.processors.elasticsearch.PutElasticsearchHttpRecord
 org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp
 org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bddcfe7/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
new file mode 100644
index 0000000..e931236
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestPutElasticsearchHttpRecord {
+
+    private TestRunner runner;
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerIndex() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerUpdate() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerDelete() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerEL() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
+        runner.assertValid();
+
+        runner.setVariable("es.url", "http://127.0.0.1:9200");
+        runner.setVariable("connect.timeout", "5s");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "${no.attr}");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchInvalidConfig() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "upsert");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
+        processor.setStatus(100, "Should fail");
+        runner = TestRunners.newTestRunner(processor); // simulate failures
+        generateTestData();
+
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        runner.clearTransferState();
+
+        processor.setStatus(500, "Should retry");
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerWithConnectException() throws IOException {
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(true);
+        processor.setStatus(-1, "Connection Exception");
+        runner = TestRunners.newTestRunner(processor); // simulate failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithNoIdPath() throws Exception {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/none"); // Field does not exist
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithNoIdField() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
+        generateTestData();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false));
+        generateTestData();
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "${type}");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // Now try an empty attribute value, should fail
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        generateTestData();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.assertValid();
+
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index_fail");
+        runner.assertValid();
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
+    /**
+     * A Test class that extends the processor in order to inject/mock behavior
+     */
+    private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
+        boolean responseHasFailures = false;
+        OkHttpClient client;
+        int statusCode = 200;
+        String statusMessage = "OK";
+
+        PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
+            this.responseHasFailures = responseHasFailures;
+        }
+
+        void setStatus(int code, String message) {
+            statusCode = code;
+            statusMessage = message;
+        }
+
+        @Override
+        protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
+            client = mock(OkHttpClient.class);
+
+            when(client.newCall(any(Request.class))).thenAnswer(invocationOnMock -> {
+                final Call call = mock(Call.class);
+                if (statusCode != -1) {
+                    Request realRequest = (Request) invocationOnMock.getArguments()[0];
+                    StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
+                    sb.append(responseHasFailures);
+                    sb.append("\", \"items\": [");
+                    if (responseHasFailures) {
+                        // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside
+                        sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
+                        sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\",");
+                        sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at ");
+                        sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},");
+                    }
+                    sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+                    sb.append(statusCode);
+                    sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}");
+
+                    sb.append("]}");
+                    Response mockResponse = new Response.Builder()
+                            .request(realRequest)
+                            .protocol(Protocol.HTTP_1_1)
+                            .code(statusCode)
+                            .message(statusMessage)
+                            .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
+                            .build();
+
+                    when(call.execute()).thenReturn(mockResponse);
+                } else {
+                    when(call.execute()).thenThrow(ConnectException.class);
+                }
+                return call;
+            });
+        }
+
+        protected OkHttpClient getClient() {
+            return client;
+        }
+    }
+
+    /////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Integration test section below
+    //
+    // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
+    // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
+    // desired test.
+    /////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Tests basic ES functionality against a local or test ES cluster
+     */
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBasic() {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
+        runner.setValidateExpressionUsage(false);
+
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.assertValid();
+
+        runner.enqueue(new byte[0], new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
+        assertNotNull(provEvents);
+        assertEquals(1, provEvents.size());
+        assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBatch() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
+        runner.setValidateExpressionUsage(false);
+
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.assertValid();
+
+        for (int i = 0; i < 100; i++) {
+            long newId = 28039652140L + i;
+            final String newStrId = Long.toString(newId);
+            runner.enqueue(new byte[0], new HashMap<String, String>() {{
+                put("doc_id", newStrId);
+            }});
+        }
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 100);
+    }
+
+    private void generateTestData() throws IOException {
+
+        final MockRecordParser parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "parser");
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.INT);
+
+        parser.addRecord(1, "rec1", 101);
+        parser.addRecord(2, "rec2", 102);
+        parser.addRecord(3, "rec3", 103);
+        parser.addRecord(4, "rec4", 104);
+    }
+}