You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/08/21 12:51:35 UTC
nifi git commit: NIFI-4250 - Elasticsearch 5 delete processor
Repository: nifi
Updated Branches:
refs/heads/master 41984bed1 -> 8cb501443
NIFI-4250 - Elasticsearch 5 delete processor
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #2045
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8cb50144
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8cb50144
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8cb50144
Branch: refs/heads/master
Commit: 8cb501443bbe27fa043572cc3b993529ff245722
Parents: 41984be
Author: mans2singh <ma...@yahoo.com>
Authored: Tue Aug 1 06:07:38 2017 -0700
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Aug 21 08:49:57 2017 -0400
----------------------------------------------------------------------
.../elasticsearch/DeleteElasticsearch5.java | 246 +++++++++++++++
.../elasticsearch/FetchElasticsearch5.java | 2 +
.../elasticsearch/PutElasticsearch5.java | 2 +
.../org.apache.nifi.processor.Processor | 1 +
.../ITDeleteElasticsearch5Test.java | 197 ++++++++++++
.../elasticsearch/TestDeleteElasticsearch5.java | 304 +++++++++++++++++++
6 files changed, 752 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
new file mode 100644
index 0000000..9e77775
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "delete", "remove"})
+@CapabilityDescription("Delete a document from Elasticsearch 5.0 by document id. If the cluster has been configured for authorization and/or secure "
+ + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made.")
+@WritesAttributes({
+ @WritesAttribute(attribute = DeleteElasticsearch5.ES_ERROR_MESSAGE, description = "The message attribute in case of error"),
+ @WritesAttribute(attribute = DeleteElasticsearch5.ES_FILENAME, description = "The filename attribute which is set to the document identifier"),
+ @WritesAttribute(attribute = DeleteElasticsearch5.ES_INDEX, description = "The Elasticsearch index containing the document"),
+ @WritesAttribute(attribute = DeleteElasticsearch5.ES_TYPE, description = "The Elasticsearch document type"),
+ @WritesAttribute(attribute = DeleteElasticsearch5.ES_REST_STATUS, description = "The filename attribute with rest status")
+})
+@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
+public class DeleteElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
+
+ public static final String UNABLE_TO_DELETE_DOCUMENT_MESSAGE = "Unable to delete document";
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("All FlowFile corresponding to the deleted document from Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("All FlowFile corresponding to delete document that failed from Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+ .description("A FlowFile is routed to this relationship if the document cannot be deleted because or retryable exception like timeout or node not available")
+ .build();
+
+ public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
+ .description("A FlowFile is routed to this relationship if the specified document was not found in elasticsearch")
+ .build();
+
+ public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder()
+ .name("el5-delete-document-id")
+ .displayName("Document Identifier")
+ .description("The identifier for the document to be deleted")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+ .name("el5-delete-index")
+ .displayName("Index")
+ .description("The name of the index to delete the document from")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+ .name("el5-delete-type")
+ .displayName("Type")
+ .description("The type of this document to be deleted")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ public static final String ES_ERROR_MESSAGE = "es.error.message";
+ public static final String ES_FILENAME = "filename";
+ public static final String ES_INDEX = "es.index";
+ public static final String ES_TYPE = "es.type";
+ public static final String ES_REST_STATUS = "es.rest.status";
+
+ static {
+ final Set<Relationship> relations = new HashSet<>();
+ relations.add(REL_SUCCESS);
+ relations.add(REL_FAILURE);
+ relations.add(REL_RETRY);
+ relations.add(REL_NOT_FOUND);
+ relationships = Collections.unmodifiableSet(relations);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(CLUSTER_NAME);
+ descriptors.add(HOSTS);
+ descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+ descriptors.add(PROP_XPACK_LOCATION);
+ descriptors.add(USERNAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(PING_TIMEOUT);
+ descriptors.add(SAMPLER_INTERVAL);
+ descriptors.add(DOCUMENT_ID);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+
+ synchronized (esClient) {
+ if(esClient.get() == null) {
+ setup(context);
+ }
+ }
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+
+ final ComponentLog logger = getLogger();
+
+ if ( StringUtils.isBlank(index) ) {
+ logger.debug("Index is required but was empty {}", new Object [] { index });
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty");
+ session.transfer(flowFile,REL_FAILURE);
+ return;
+ }
+ if ( StringUtils.isBlank(documentType) ) {
+ logger.debug("Document type is required but was empty {}", new Object [] { documentType });
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty");
+ session.transfer(flowFile,REL_FAILURE);
+ return;
+ }
+ if ( StringUtils.isBlank(documentId) ) {
+ logger.debug("Document id is required but was empty {}", new Object [] { documentId });
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty");
+ session.transfer(flowFile,REL_FAILURE);
+ return;
+ }
+
+ flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
+ put(ES_FILENAME, documentId);
+ put(ES_INDEX, index);
+ put(ES_TYPE, documentType);
+ }});
+
+ try {
+
+ logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId});
+ DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType);
+ final DeleteResponse response = doDelete(requestBuilder);
+
+ if (response.status() != RestStatus.OK) {
+ logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}",
+ new Object[]{index, documentType, documentId, response.status()});
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE);
+ flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString());
+ context.yield();
+ if ( response.status() == RestStatus.NOT_FOUND ) {
+ session.transfer(flowFile, REL_NOT_FOUND);
+ } else {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } else {
+ logger.debug("Elasticsearch document " + documentId + " deleted");
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ } catch ( ElasticsearchTimeoutException
+ | ReceiveTimeoutTransportException exception) {
+ logger.error("Failed to delete document {} from Elasticsearch due to {}",
+ new Object[]{documentId, exception.getLocalizedMessage()}, exception);
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage());
+ session.transfer(flowFile, REL_RETRY);
+ context.yield();
+
+ } catch (Exception e) {
+ logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e);
+ flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ }
+
+ protected DeleteRequestBuilder prepareDeleteRequest(final String index, final String documentId, final String documentType) {
+ return esClient.get().prepareDelete(index, documentType, documentId);
+ }
+
+ protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+ throws InterruptedException, ExecutionException {
+ return requestBuilder.execute().get();
+ }
+
+ @OnStopped
+ public void closeClient() {
+ super.closeClient();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
index dcae615..ee8674e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -65,6 +66,7 @@ import java.util.Set;
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
})
+@SeeAlso({DeleteElasticsearch5.class,PutElasticsearch5.class})
public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
index ef70bf0..ded35cf 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -62,6 +63,7 @@ import java.util.Set;
+ "the index to insert into and the type of the document. If the cluster has been configured for authorization "
+ "and/or secure transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor "
+ "supports Elasticsearch 5.x clusters.")
+@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
public class PutElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
private static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 156f7c0..8db7223 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.processors.elasticsearch.FetchElasticsearch5
org.apache.nifi.processors.elasticsearch.PutElasticsearch5
+org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
new file mode 100644
index 0000000..f9dfe58
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+/**
+ * Integration test for delete processor. Please set the hosts, cluster name, index and type etc before running the integrations.
+ */
+@Ignore("Comment this out for es delete integration testing and set the appropriate cluster name, hosts, etc")
+public class ITDeleteElasticsearch5Test {
+
+ private static final String TYPE1 = "type1";
+ private static final String INDEX1 = "index1";
+ protected DeleteResponse deleteResponse;
+ protected RestStatus restStatus;
+ private InputStream inputDocument;
+ protected String clusterName = "elasticsearch";
+ private String documentId;
+
+ @Before
+ public void setUp() throws IOException {
+ ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+ inputDocument = classloader.getResourceAsStream("DocumentExample.json");
+ long currentTimeMillis = System.currentTimeMillis();
+ documentId = String.valueOf(currentTimeMillis);
+ }
+
+ @After
+ public void teardown() {
+ }
+
+ @Test
+ public void testPutAndDeleteIntegrationTestSuccess() {
+ final TestRunner runnerPut = TestRunners.newTestRunner(new PutElasticsearch5());
+ runnerPut.setValidateExpressionUsage(false);
+ runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+ runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runnerPut.setProperty(PutElasticsearch5.INDEX, INDEX1);
+ runnerPut.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
+
+ runnerPut.setProperty(PutElasticsearch5.TYPE, TYPE1);
+ runnerPut.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "id");
+ runnerPut.assertValid();
+
+ runnerPut.enqueue(inputDocument, new HashMap<String, String>() {{
+ put("id", documentId);
+ }});
+
+ runnerPut.enqueue(inputDocument);
+ runnerPut.run(1, true, true);
+
+ runnerPut.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
+
+ final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+ runnerDelete.setValidateExpressionUsage(false);
+
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+
+ runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runnerDelete.assertValid();
+
+ runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+
+ runnerDelete.enqueue(new byte [] {});
+ runnerDelete.run(1, true, true);
+
+ runnerDelete.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testDeleteIntegrationTestDocumentNotFound() {
+ final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+ runnerDelete.setValidateExpressionUsage(false);
+
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+
+ runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runnerDelete.assertValid();
+
+ runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+
+ runnerDelete.enqueue(new byte [] {});
+ runnerDelete.run(1, true, true);
+
+ runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+ final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteIntegrationTestBadIndex() {
+ final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+ runnerDelete.setValidateExpressionUsage(false);
+
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ String index = String.valueOf(System.currentTimeMillis());
+ runnerDelete.setProperty(DeleteElasticsearch5.INDEX, index);
+
+ runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runnerDelete.assertValid();
+
+ runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+
+ runnerDelete.enqueue(new byte [] {});
+ runnerDelete.run(1, true, true);
+ runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+ final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, index);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteIntegrationTestBadType() {
+ final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
+ runnerDelete.setValidateExpressionUsage(false);
+
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+ String type = String.valueOf(System.currentTimeMillis());
+ runnerDelete.setProperty(DeleteElasticsearch5.TYPE, type);
+ runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runnerDelete.assertValid();
+
+ runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+
+ runnerDelete.enqueue(new byte [] {});
+ runnerDelete.run(1, true, true);
+
+ runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+ final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8cb50144/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
new file mode 100644
index 0000000..83e8a2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestDeleteElasticsearch5 {
+
+ private String documentId;
+ private static final String TYPE1 = "type1";
+ private static final String INDEX1 = "index1";
+ private TestRunner runner;
+ protected DeleteResponse deleteResponse;
+ protected RestStatus restStatus;
+ private DeleteElasticsearch5 mockDeleteProcessor;
+ long currentTimeMillis;
+
+ @Before
+ public void setUp() throws IOException {
+ currentTimeMillis = System.currentTimeMillis();
+ documentId = String.valueOf(currentTimeMillis);
+ mockDeleteProcessor = new DeleteElasticsearch5() {
+
+ @Override
+ protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+ return null;
+ }
+
+ @Override
+ protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+ throws InterruptedException, ExecutionException {
+ return deleteResponse;
+ }
+
+ @Override
+ public void setup(ProcessContext context) {
+ }
+
+ };
+
+ runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+ runner.assertNotValid();
+ runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runner.assertNotValid();
+ runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runner.assertValid();
+ }
+
+ @After
+ public void teardown() {
+ runner = null;
+ }
+
+ @Test
+ public void testDeleteWithNoDocumentId() throws IOException {
+
+ runner.enqueue(new byte [] {});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+ assertNotNull(out);
+ assertEquals("Document id is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ }
+
+ @Test
+ public void testDeleteWithNoIndex() throws IOException {
+ runner.setProperty(DeleteElasticsearch5.INDEX, "${index}");
+
+ runner.enqueue(new byte [] {});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+ assertNotNull(out);
+ assertEquals("Index is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ }
+
+ @Test
+ public void testDeleteWithNoType() throws IOException {
+ runner.setProperty(DeleteElasticsearch5.TYPE, "${type}");
+
+ runner.enqueue(new byte [] {});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+ assertNotNull(out);
+ assertEquals("Document type is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ }
+
+ @Test
+ public void testDeleteSuccessful() throws IOException {
+ restStatus = RestStatus.OK;
+ deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+ @Override
+ public RestStatus status() {
+ return restStatus;
+ }
+
+ };
+ runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ assertEquals(null,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteNotFound() throws IOException {
+ restStatus = RestStatus.NOT_FOUND;
+ deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+ @Override
+ public RestStatus status() {
+ return restStatus;
+ }
+
+ };
+ runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
+ assertNotNull(out);
+ assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteServerFailure() throws IOException {
+ restStatus = RestStatus.SERVICE_UNAVAILABLE;
+ deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
+
+ @Override
+ public RestStatus status() {
+ return restStatus;
+ }
+
+ };
+ runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+ assertNotNull(out);
+ assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteRetryableException() throws IOException {
+ mockDeleteProcessor = new DeleteElasticsearch5() {
+
+ @Override
+ protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+ return null;
+ }
+
+ @Override
+ protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+ throws InterruptedException, ExecutionException {
+ throw new ElasticsearchTimeoutException("timeout");
+ }
+
+ @Override
+ public void setup(ProcessContext context) {
+ }
+
+ };
+ runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+ runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runner.assertValid();
+
+ runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_RETRY, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_RETRY).get(0);
+ assertNotNull(out);
+ assertEquals("timeout",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+ @Test
+ public void testDeleteNonRetryableException() throws IOException {
+ mockDeleteProcessor = new DeleteElasticsearch5() {
+
+ @Override
+ protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
+ return null;
+ }
+
+ @Override
+ protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
+ throws InterruptedException, ExecutionException {
+ throw new InterruptedException("exception");
+ }
+
+ @Override
+ public void setup(ProcessContext context) {
+ }
+
+ };
+ runner = TestRunners.newTestRunner(mockDeleteProcessor);
+
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
+ runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
+
+ runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
+ runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
+ runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
+ runner.assertValid();
+
+ runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
+ put("documentId", documentId);
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
+ assertNotNull(out);
+ assertEquals("exception",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
+ out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
+ }
+
+}