You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/08/21 12:51:35 UTC

nifi git commit: NIFI-4250 - Elasticsearch 5 delete processor

Repository: nifi
Updated Branches:
  refs/heads/master 41984bed1 -> 8cb501443


NIFI-4250 - Elasticsearch 5 delete processor

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #2045


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8cb50144
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8cb50144
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8cb50144

Branch: refs/heads/master
Commit: 8cb501443bbe27fa043572cc3b993529ff245722
Parents: 41984be
Author: mans2singh <ma...@yahoo.com>
Authored: Tue Aug 1 06:07:38 2017 -0700
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Aug 21 08:49:57 2017 -0400

----------------------------------------------------------------------
 .../elasticsearch/DeleteElasticsearch5.java     | 246 +++++++++++++++
 .../elasticsearch/FetchElasticsearch5.java      |   2 +
 .../elasticsearch/PutElasticsearch5.java        |   2 +
 .../org.apache.nifi.processor.Processor         |   1 +
 .../ITDeleteElasticsearch5Test.java             | 197 ++++++++++++
 .../elasticsearch/TestDeleteElasticsearch5.java | 304 +++++++++++++++++++
 6 files changed, 752 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
new file mode 100644
index 0000000..9e77775
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "delete", "remove"})
+@CapabilityDescription("Delete a document from Elasticsearch 5.0 by document id. If the cluster has been configured for authorization and/or secure "
+        + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made.")
+@WritesAttributes({
+        @WritesAttribute(attribute = DeleteElasticsearch5.ES_ERROR_MESSAGE, description = "The message attribute in case of error"),
+        @WritesAttribute(attribute = DeleteElasticsearch5.ES_FILENAME, description = "The filename attribute which is set to the document identifier"),
+        @WritesAttribute(attribute = DeleteElasticsearch5.ES_INDEX, description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = DeleteElasticsearch5.ES_TYPE, description = "The Elasticsearch document type"),
+        @WritesAttribute(attribute = DeleteElasticsearch5.ES_REST_STATUS, description = "The filename attribute with rest status")
+})
+@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
+public class DeleteElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
+
+    public static final String UNABLE_TO_DELETE_DOCUMENT_MESSAGE = "Unable to delete document";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("All FlowFile corresponding to the deleted document from Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("All FlowFile corresponding to delete document that failed from Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("A FlowFile is routed to this relationship if the document cannot be deleted because or retryable exception like timeout or node not available")
+            .build();
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
+            .description("A FlowFile is routed to this relationship if the specified document was not found in elasticsearch")
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder()
+            .name("el5-delete-document-id")
+            .displayName("Document Identifier")
+            .description("The identifier for the document to be deleted")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("el5-delete-index")
+            .displayName("Index")
+            .description("The name of the index to delete the document from")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("el5-delete-type")
+            .displayName("Type")
+            .description("The type of this document to be deleted")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    public static final String ES_ERROR_MESSAGE = "es.error.message";
+    public static final String ES_FILENAME = "filename";
+    public static final String ES_INDEX = "es.index";
+    public static final String ES_TYPE = "es.type";
+    public static final String ES_REST_STATUS = "es.rest.status";
+
+    static {
+        final Set<Relationship> relations = new HashSet<>();
+        relations.add(REL_SUCCESS);
+        relations.add(REL_FAILURE);
+        relations.add(REL_RETRY);
+        relations.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(relations);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(CLUSTER_NAME);
+        descriptors.add(HOSTS);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(PROP_XPACK_LOCATION);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(PING_TIMEOUT);
+        descriptors.add(SAMPLER_INTERVAL);
+        descriptors.add(DOCUMENT_ID);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+
+        synchronized (esClient) {
+            if(esClient.get() == null) {
+                setup(context);
+            }
+        }
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+        final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
+        final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+
+        final ComponentLog logger = getLogger();
+
+        if ( StringUtils.isBlank(index) ) {
+            logger.debug("Index is required but was empty {}", new Object [] { index });
+            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty");
+            session.transfer(flowFile,REL_FAILURE);
+            return;
+        }
+        if ( StringUtils.isBlank(documentType) ) {
+            logger.debug("Document type is required but was empty {}", new Object [] { documentType });
+            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty");
+            session.transfer(flowFile,REL_FAILURE);
+            return;
+        }
+        if ( StringUtils.isBlank(documentId) ) {
+            logger.debug("Document id is required but was empty {}", new Object [] { documentId });
+            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty");
+            session.transfer(flowFile,REL_FAILURE);
+            return;
+        }
+
+        flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
+            put(ES_FILENAME, documentId);
+            put(ES_INDEX, index);
+            put(ES_TYPE, documentType);
+        }});
+
+        try {
+
+            logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId});
+            DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType);
+            final DeleteResponse response = doDelete(requestBuilder);
+
+            if (response.status() != RestStatus.OK)  {
+                logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}",
+                        new Object[]{index, documentType, documentId, response.status()});
+                flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE);
+                flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString());
+                context.yield();
+                if ( response.status() ==  RestStatus.NOT_FOUND ) {
+                       session.transfer(flowFile, REL_NOT_FOUND);
+                } else {
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            } else {
+                logger.debug("Elasticsearch document " + documentId + " deleted");
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch ( ElasticsearchTimeoutException
+                | ReceiveTimeoutTransportException exception) {
+            logger.error("Failed to delete document {} from Elasticsearch due to {}",
+                    new Object[]{documentId, exception.getLocalizedMessage()}, exception);
+            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage());
+            session.transfer(flowFile, REL_RETRY);
+            context.yield();
+
+        } catch (Exception e) {
+            logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e);
+            flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage());
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    protected DeleteRequestBuilder prepareDeleteRequest(final String index, final String documentId, final String documentType) {
+        return esClient.get().prepareDelete(index, documentType, documentId);
+    }
+
+    protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+            throws InterruptedException, ExecutionException {
+        return requestBuilder.execute().get();
+    }
+
+    @OnStopped
+    public void closeClient() {
+        super.closeClient();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
index dcae615..ee8674e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -65,6 +66,7 @@ import java.util.Set;
         @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
         @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
 })
+@SeeAlso({DeleteElasticsearch5.class,PutElasticsearch5.class})
 public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")

http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
index ef70bf0..ded35cf 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -62,6 +63,7 @@ import java.util.Set;
         + "the index to insert into and the type of the document. If the cluster has been configured for authorization "
         + "and/or secure transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor "
         + "supports Elasticsearch 5.x clusters.")
+@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
 public class PutElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
 
     private static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 156f7c0..8db7223 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,4 @@
 # limitations under the License.
 org.apache.nifi.processors.elasticsearch.FetchElasticsearch5
 org.apache.nifi.processors.elasticsearch.PutElasticsearch5
+org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5

http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
new file mode 100644
index 0000000..f9dfe58
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+/**
+ * Integration test for delete processor. Please set the hosts, cluster name, index and type etc before running the integrations.
+ */
+@Ignore("Comment this out for es delete integration testing and set the appropriate cluster name, hosts, etc")
+public class ITDeleteElasticsearch5Test {
+
+    private static final String TYPE1 = "type1";
+    private static final String INDEX1 = "index1";
+    protected DeleteResponse deleteResponse;
+    protected RestStatus restStatus;
+    private InputStream inputDocument;
+    protected String clusterName = "elasticsearch";
+    private String documentId;
+
+    @Before
+    public void setUp() throws IOException {
+        ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+        inputDocument = classloader.getResourceAsStream("DocumentExample.json");
+        long currentTimeMillis = System.currentTimeMillis();
+        documentId = String.valueOf(currentTimeMillis);
+    }
+
+    @After
+    public void teardown() {
+    }
+
+    @Test
+    public void testPutAndDeleteIntegrationTestSuccess() {
+        final TestRunner runnerPut = TestRunners.newTestRunner(new PutElasticsearch5());
+        runnerPut.setValidateExpressionUsage(false);
+        runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+        runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runnerPut.setProperty(PutElasticsearch5.INDEX, INDEX1);
+        runnerPut.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
+
+        runnerPut.setProperty(PutElasticsearch5.TYPE, TYPE1);
+        runnerPut.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "id");
+        runnerPut.assertValid();
+
+        runnerPut.enqueue(inputDocument, new HashMap<String, String>() {{
+            put("id", documentId);
+        }});
+
+        runnerPut.enqueue(inputDocument);
+        runnerPut.run(1, true, true);
+
+        runnerPut.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
+
+        final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+        runnerDelete.setValidateExpressionUsage(false);
+
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+
+        runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runnerDelete.assertValid();
+
+        runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+
+        runnerDelete.enqueue(new byte [] {});
+        runnerDelete.run(1, true, true);
+
+        runnerDelete.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testDeleteIntegrationTestDocumentNotFound() {
+        final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+        runnerDelete.setValidateExpressionUsage(false);
+
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+
+        runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runnerDelete.assertValid();
+
+        runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+
+        runnerDelete.enqueue(new byte [] {});
+        runnerDelete.run(1, true, true);
+
+        runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+        final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteIntegrationTestBadIndex() {
+        final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+        runnerDelete.setValidateExpressionUsage(false);
+
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        String index = String.valueOf(System.currentTimeMillis());
+        runnerDelete.setProperty(DeleteElasticsearch5.INDEX, index);
+
+        runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runnerDelete.assertValid();
+
+        runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+
+        runnerDelete.enqueue(new byte [] {});
+        runnerDelete.run(1, true, true);
+        runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+        final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, index);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteIntegrationTestBadType() {
+        final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+        runnerDelete.setValidateExpressionUsage(false);
+
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+        String type = String.valueOf(System.currentTimeMillis());
+        runnerDelete.setProperty(DeleteElasticsearch5.TYPE, type);
+        runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runnerDelete.assertValid();
+
+        runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+        put("documentId", documentId);
+        }});
+
+        runnerDelete.enqueue(new byte [] {});
+        runnerDelete.run(1, true, true);
+
+        runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+        final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
new file mode 100644
index 0000000..83e8a2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestDeleteElasticsearch5 {
+
+    private String documentId;
+    private static final String TYPE1 = "type1";
+    private static final String INDEX1 = "index1";
+    private TestRunner runner;
+    protected DeleteResponse deleteResponse;
+    protected RestStatus restStatus;
+    private DeleteElasticsearch5 mockDeleteProcessor;
+    long currentTimeMillis;
+
+    @Before
+    public void setUp() throws IOException {
+        currentTimeMillis = System.currentTimeMillis();
+        documentId = String.valueOf(currentTimeMillis);
+        mockDeleteProcessor = new DeleteElasticsearch5() {
+
+            @Override
+            protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+                return null;
+            }
+
+            @Override
+            protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+                    throws InterruptedException, ExecutionException {
+                return deleteResponse;
+            }
+
+            @Override
+            public void setup(ProcessContext context) {
+            }
+
+        };
+
+        runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+        runner.assertNotValid();
+        runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runner.assertNotValid();
+        runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runner.assertValid();
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testDeleteWithNoDocumentId() throws IOException {
+
+        runner.enqueue(new byte [] {});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+        assertNotNull(out);
+        assertEquals("Document id is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+    }
+
+    @Test
+    public void testDeleteWithNoIndex() throws IOException {
+        runner.setProperty(DeleteElasticsearch5.INDEX, "${index}");
+
+        runner.enqueue(new byte [] {});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+        assertNotNull(out);
+        assertEquals("Index is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+    }
+
+    @Test
+    public void testDeleteWithNoType() throws IOException {
+        runner.setProperty(DeleteElasticsearch5.TYPE, "${type}");
+
+        runner.enqueue(new byte [] {});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+        assertNotNull(out);
+        assertEquals("Document type is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+    }
+
+    @Test
+    public void testDeleteSuccessful() throws IOException {
+        restStatus = RestStatus.OK;
+        deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+            @Override
+            public RestStatus status() {
+                return restStatus;
+            }
+
+        };
+        runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        assertEquals(null,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteNotFound() throws IOException {
+        restStatus = RestStatus.NOT_FOUND;
+        deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+            @Override
+            public RestStatus status() {
+                return restStatus;
+            }
+
+        };
+        runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+        assertNotNull(out);
+        assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteServerFailure() throws IOException {
+        restStatus = RestStatus.SERVICE_UNAVAILABLE;
+        deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+            @Override
+            public RestStatus status() {
+                return restStatus;
+            }
+
+        };
+        runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+        assertNotNull(out);
+        assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteRetryableException() throws IOException {
+        mockDeleteProcessor = new DeleteElasticsearch5() {
+
+            @Override
+            protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+                return null;
+            }
+
+            @Override
+            protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+                    throws InterruptedException, ExecutionException {
+                throw new ElasticsearchTimeoutException("timeout");
+            }
+
+            @Override
+            public void setup(ProcessContext context) {
+            }
+
+        };
+        runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+        runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runner.assertValid();
+
+        runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_RETRY, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_RETRY).get(0);
+        assertNotNull(out);
+        assertEquals("timeout",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+    @Test
+    public void testDeleteNonRetryableException() throws IOException {
+        mockDeleteProcessor = new DeleteElasticsearch5() {
+
+            @Override
+            protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+                return null;
+            }
+
+            @Override
+            protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+                    throws InterruptedException, ExecutionException {
+                throw new InterruptedException("exception");
+            }
+
+            @Override
+            public void setup(ProcessContext context) {
+            }
+
+        };
+        runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+        runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+        runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+        runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+        runner.assertValid();
+
+        runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+            put("documentId", documentId);
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+        assertNotNull(out);
+        assertEquals("exception",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+        out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+    }
+
+}