You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/19 18:54:58 UTC
[nifi] branch master updated: NIFI-5248 Added new Elasticsearch
processors. NIFI-5248 Fixed a few stray 1.7.0 references. NIFI-5248 Removed
build helper plugin. NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service. NIFI-5248 Made a few changes from a code
review. NIFI-5248 Added logic for removing nulls so record paths can be
removed when no longer needed. NIFI-5248 Switched from variable registry to
flowfile level EL. NIFI-5248 Added JsonPath code to remove index,
id and type path statements. [...]
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c79ff0 NIFI-5248 Added new Elasticsearch processors. NIFI-5248 Fixed a few stray 1.7.0 references. NIFI-5248 Removed build helper plugin. NIFI-5248 Made changes requested in a review. NIFI-5248 Updated dummy service. NIFI-5248 Made a few changes from a code review. NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed. NIFI-5248 Switched from variable registry to flowfile level EL. NIFI-5248 Added JsonPath code to remove index, id and typ [...]
4c79ff0 is described below
commit 4c79ff057638d703cedad1f33855bc066c71f357
Author: Mike Thomsen <mi...@gmail.com>
AuthorDate: Sun Jul 8 14:51:12 2018 -0400
NIFI-5248 Added new Elasticsearch processors.
NIFI-5248 Fixed a few stray 1.7.0 references.
NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service.
NIFI-5248 Made a few changes from a code review.
NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation.
NIFI-5248 Set the field to null instead of empty string when nulling records.
NIFI-5248 Fixed TestElasticSearchClientService.
NIFI-5248 Removed high level client and switched over to low level client for everything.
NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
NIFI-5248 Updated integration tests to support 5 and 6.
NIFI-5248 Fixed some style check breaks.
NIFI-5248 Added create operation type.
NIFI-5248 Updated documentation.
NIFI-5248 Added error handling to PutElasticsearchRecord.
NIFI-5248 Added error logging to PutElasticsearchJson.
NIFI-5248 Added split failed records option to PutElasticsearchJson.
NIFI-5248 Added documentation for PutElasticsearchRecord.
NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema inference.
NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.
NIFI-5248 Added groovy-json test dependency.
NIFI-5248 Updated PutElasticsearchRecord to only do index operations.
NIFI-5248 Added batch size property and refactored the way relationships and properties are added.
NIFI-5248 Added batch processing support.
NIFI-5248 Updated error handling.
NIFI-5248 Updated to 1.11.0-SNAPSHOT.
NIFI-5248 Made changes requested in a code review.
NIFI-5248 Made a few more changes from a code review.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2861
---
.../nifi-elasticsearch-client-service-api/pom.xml | 6 +
.../elasticsearch/ElasticSearchClientService.java | 38 ++-
...erationRequest.java => ElasticsearchError.java} | 49 ++-
.../nifi/elasticsearch/IndexOperationRequest.java | 31 +-
.../nifi/elasticsearch/IndexOperationResponse.java | 34 +-
.../nifi-elasticsearch-client-service/pom.xml | 43 ++-
.../ElasticSearchClientServiceImpl.java | 216 +++++++++----
.../nifi/elasticsearch/SearchResponseTest.groovy | 42 +++
.../ElasticSearch5ClientService_IT.groovy | 104 +++++-
.../ElasticSearchLookupService_IT.groovy | 10 +-
.../TestElasticSearchClientService.groovy | 19 +-
.../src/test/resources/setup-5.script | 45 +++
.../src/test/resources/setup-6.script | 46 +++
.../src/test/resources/setup-7.script | 45 +++
.../src/test/resources/setup.script | 41 ---
.../nifi-elasticsearch-restapi-processors/pom.xml | 45 +++
.../elasticsearch/DeleteByQueryElasticsearch.java | 6 +-
...cessor.java => ElasticsearchRestProcessor.java} | 37 ++-
.../elasticsearch/JsonQueryElasticsearch.java | 9 +-
.../elasticsearch/PutElasticsearchRecord.java | 355 +++++++++++++++++++++
.../elasticsearch/api/BulkOperation.java | 48 +++
.../put/FlowFileJsonDescription.java} | 35 +-
.../elasticsearch/put/JsonProcessingError.java} | 20 +-
.../additionalDetails.html | 48 +++
.../additionalDetails.html | 64 ++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 2 +-
.../additionalDetails.html | 4 +-
.../additionalDetails.html | 41 +++
.../DeleteByQueryElasticsearchTest.groovy | 143 +++++++++
.../JsonQueryElasticsearchTest.groovy} | 266 ++++++++-------
.../PutElasticsearchRecordTest.groovy | 261 +++++++++++++++
.../TestElasticsearchClientService.groovy} | 103 +++---
.../mock/AbstractMockElasticsearchClient.groovy} | 46 ++-
.../mock/MockBulkLoadClientService.groovy | 118 +++++++
.../mock/MockElasticsearchError.groovy} | 23 +-
.../src/test/java/.gitignore | 1 +
.../DeleteByQueryElasticsearchTest.java | 134 --------
38 files changed, 2029 insertions(+), 550 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index a39c6cd..29b1590 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -43,5 +43,11 @@
<version>1.11.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.8</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 57f359d..d35c784 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -30,12 +30,12 @@ import java.util.List;
import java.util.Map;
@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an ElasticSearch client.")
+@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
.displayName("HTTP Hosts")
- .description("A comma-separated list of HTTP hosts that host ElasticSearch query nodes.")
+ .description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -93,7 +93,7 @@ public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("el-cs-charset")
.displayName("Charset")
- .description("The charset to use for interpreting the response from ElasticSearch.")
+ .description("The charset to use for interpreting the response from Elasticsearch.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -106,16 +106,26 @@ public interface ElasticSearchClientService extends ControllerService {
* @return IndexOperationResponse if successful
* @throws IOException thrown when there is an error.
*/
- IndexOperationResponse add(IndexOperationRequest operation) throws IOException;
+ IndexOperationResponse add(IndexOperationRequest operation);
/**
- * Index multiple documents.
+ * Bulk process multiple documents.
*
- * @param operations A list of documents to index.
+ * @param operations A list of index operations.
* @return IndexOperationResponse if successful.
* @throws IOException thrown when there is an error.
*/
- IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException;
+ IndexOperationResponse bulk(List<IndexOperationRequest> operations);
+
+ /**
+ * Count the documents that match the criteria.
+ *
+ * @param query A query in the JSON DSL syntax
+ * @param index The index to target.
+ * @param type The type to target.
+ * @return
+ */
+ Long count(String query, String index, String type);
/**
* Delete a document by its ID from an index.
@@ -125,7 +135,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param id The document ID to remove from the selected index.
* @return A DeleteOperationResponse object if successful.
*/
- DeleteOperationResponse deleteById(String index, String type, String id) throws IOException;
+ DeleteOperationResponse deleteById(String index, String type, String id);
/**
@@ -136,7 +146,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @return A DeleteOperationResponse object if successful.
* @throws IOException thrown when there is an error.
*/
- DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException;
+ DeleteOperationResponse deleteById(String index, String type, List<String> ids);
/**
* Delete documents by query.
@@ -146,7 +156,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param type The type to target within the index. Optional.
* @return A DeleteOperationResponse object if successful.
*/
- DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException;
+ DeleteOperationResponse deleteByQuery(String query, String index, String type);
/**
* Get a document by ID.
@@ -157,23 +167,23 @@ public interface ElasticSearchClientService extends ControllerService {
* @return Map if successful, null if not found.
* @throws IOException thrown when there is an error.
*/
- Map<String, Object> get(String index, String type, String id) throws IOException;
+ Map<String, Object> get(String index, String type, String id);
/**
* Perform a search using the JSON DSL.
*
* @param query A JSON string reprensenting the query.
* @param index The index to target. Optional.
- * @param type The type to target. Optional. Will not be used in future versions of ElasticSearch.
+ * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @return A SearchResponse object if successful.
*/
- SearchResponse search(String query, String index, String type) throws IOException;
+ SearchResponse search(String query, String index, String type);
/**
* Build a transit URL to use with the provenance reporter.
* @param index Index targeted. Optional.
* @param type Type targeted. Optional
- * @return a URL describing the ElasticSearch cluster.
+ * @return a URL describing the Elasticsearch cluster.
*/
String getTransitUrl(String index, String type);
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
similarity index 51%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
index 9281adb..c383c8d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchError.java
@@ -17,34 +17,29 @@
package org.apache.nifi.elasticsearch;
-import java.util.Map;
-
-public class IndexOperationRequest {
- private String index;
- private String type;
- private String id;
- private Map<String, Object> fields;
-
- public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
- this.index = index;
- this.type = type;
- this.id = id;
- this.fields = fields;
- }
-
- public String getIndex() {
- return index;
- }
-
- public String getType() {
- return type;
- }
-
- public String getId() {
- return id;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ElasticsearchError extends RuntimeException {
+ /**
+ * These are names of common Elasticsearch exceptions where it is safe to assume
+ * that it's OK to retry the operation instead of just sending it to an error relationship.
+ */
+ public static final Set<String> ELASTIC_ERROR_NAMES = new HashSet<String>(){{
+ add("NoNodeAvailableException");
+ add("ElasticsearchTimeoutException");
+ add("ReceiveTimeoutTransportException");
+ add("NodeClosedException");
+ }};
+
+ protected boolean isElastic;
+
+ public ElasticsearchError(Exception ex) {
+ super(ex);
+ isElastic = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
}
- public Map<String, Object> getFields() {
- return fields;
+ public boolean isElastic() {
+ return isElastic;
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index 9281adb..df0679a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -19,17 +19,23 @@ package org.apache.nifi.elasticsearch;
import java.util.Map;
+/**
+ * A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it
+ * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
+ */
public class IndexOperationRequest {
private String index;
private String type;
private String id;
private Map<String, Object> fields;
+ private Operation operation;
- public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
+ public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
+ this.operation = operation;
}
public String getIndex() {
@@ -47,4 +53,25 @@ public class IndexOperationRequest {
public Map<String, Object> getFields() {
return fields;
}
-}
+
+ public Operation getOperation() {
+ return operation;
+ }
+
+ public enum Operation {
+ Create("create"),
+ Delete("delete"),
+ Index("index"),
+ Update("update"),
+ Upsert("upsert");
+ String value;
+
+ Operation(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
index a22b7aa..c001f69 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
@@ -17,20 +17,44 @@
package org.apache.nifi.elasticsearch;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
public class IndexOperationResponse {
private long took;
- private long ingestTook;
+ private boolean hasErrors;
+ private List<Map<String, Object>> items;
- public IndexOperationResponse(long took, long ingestTook) {
+ public IndexOperationResponse(long took) {
this.took = took;
- this.ingestTook = ingestTook;
}
public long getTook() {
return took;
}
- public long getIngestTook() {
- return ingestTook;
+ public boolean hasErrors() {
+ return hasErrors;
+ }
+
+ public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, Object> parsedResponse = mapper.readValue(response, Map.class);
+ int took = (int) parsedResponse.get("took");
+ boolean hasErrors = (boolean) parsedResponse.get("errors");
+ List<Map<String, Object>> items = (List<Map<String, Object>>)parsedResponse.get("items");
+
+ IndexOperationResponse retVal = new IndexOperationResponse(took);
+ retVal.hasErrors = hasErrors;
+ retVal.items = items;
+
+ return retVal;
+ }
+
+ public List<Map<String, Object>> getItems() {
+ return items;
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index e63333e..2873972 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -25,6 +25,12 @@
<artifactId>nifi-elasticsearch-client-service</artifactId>
<packaging>jar</packaging>
+ <properties>
+ <es.int.version>5.6.15</es.int.version>
+ <script.name>setup-5.script</script.name>
+ <type.name>faketype</type.name>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -159,26 +165,57 @@
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
<scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-client</artifactId>
+ <version>5.6.16</version>
</dependency>
</dependencies>
<profiles>
<profile>
+ <id>integration-6</id>
+ <properties>
+ <es.int.version>6.7.1</es.int.version>
+ <type.name>_doc</type.name>
+ <script.name>setup-6.script</script.name>
+ </properties>
+ </profile>
+ <profile>
+ <id>integration-7</id>
+ <properties>
+ <es.int.version>7.0.0</es.int.version>
+ <script.name>setup-7.script</script.name>
+ <type.name>_doc</type.name>
+ </properties>
+ </profile>
+ <profile>
<id>integration-tests</id>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>3.0.0-M3</version>
+ <configuration>
+ <systemPropertyVariables>
+ <type_name>${type.name}</type_name>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>com.github.alexcojocaru</groupId>
<artifactId>elasticsearch-maven-plugin</artifactId>
- <version>6.0</version>
+ <version>6.13</version>
<configuration>
<clusterName>testCluster</clusterName>
<transportPort>9500</transportPort>
<httpPort>9400</httpPort>
- <version>5.6.2</version>
+ <version>${es.int.version}</version>
<timeout>90</timeout>
<logLevel>ERROR</logLevel>
- <pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript>
+ <pathInitScript>${project.basedir}/src/test/resources/${script.name}</pathInitScript>
</configuration>
<executions>
<execution>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index b37ba0c..f856c76 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -17,9 +17,9 @@
package org.apache.nifi.elasticsearch;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
-import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -27,25 +27,21 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.WriteRequest;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@@ -59,6 +55,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
private ObjectMapper mapper = new ObjectMapper();
@@ -66,7 +63,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static final private List<PropertyDescriptor> properties;
private RestClient client;
- private RestHighLevelClient highLevelClient;
private String url;
private Charset charset;
@@ -158,10 +154,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.setMaxRetryTimeoutMillis(retryTimeout);
this.client = builder.build();
- this.highLevelClient = new RestHighLevelClient(client);
}
- private Response runQuery(String endpoint, String query, String index, String type) throws IOException {
+ private Response runQuery(String endpoint, String query, String index, String type) {
StringBuilder sb = new StringBuilder()
.append("/")
.append(index);
@@ -174,68 +169,153 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
- return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
+ try {
+ return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
+ } catch (Exception e) {
+ throw new ElasticsearchError(e);
+ }
}
- private Map<String, Object> parseResponse(Response response) throws IOException {
+ private Map<String, Object> parseResponse(Response response) {
final int code = response.getStatusLine().getStatusCode();
- if (code >= 200 & code < 300) {
- InputStream inputStream = response.getEntity().getContent();
- byte[] result = IOUtils.toByteArray(inputStream);
- inputStream.close();
- return mapper.readValue(new String(result, charset), Map.class);
- } else {
- String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
- response.getStatusLine().getReasonPhrase());
- throw new IOException(errorMessage);
+ try {
+ if (code >= 200 & code < 300) {
+ InputStream inputStream = response.getEntity().getContent();
+ byte[] result = IOUtils.toByteArray(inputStream);
+ inputStream.close();
+ return mapper.readValue(new String(result, charset), Map.class);
+ } else {
+ String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
+ response.getStatusLine().getReasonPhrase());
+ throw new IOException(errorMessage);
+ }
+ } catch (Exception ex) {
+ throw new ElasticsearchError(ex);
}
}
@Override
- public IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
- return add(Arrays.asList(operation));
+ public IndexOperationResponse add(IndexOperationRequest operation) {
+ return bulk(Arrays.asList(operation));
}
- @Override
- public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- for (int index = 0; index < operations.size(); index++) {
- IndexOperationRequest or = operations.get(index);
- IndexRequest indexRequest = new IndexRequest(or.getIndex(), or.getType(), or.getId())
- .source(or.getFields());
- bulkRequest.add(indexRequest);
+ private String flatten(String str) {
+ return str.replaceAll("[\\n\\r]", "\\\\n");
+ }
+
+ private String buildBulkHeader(IndexOperationRequest request) throws JsonProcessingException {
+ String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
+ ? "update"
+ : request.getOperation().getValue();
+ return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
+ }
+
+ private String buildBulkHeader(String operation, String index, String type, String id) throws JsonProcessingException {
+ Map<String, Object> header = new HashMap<String, Object>() {{
+ put(operation, new HashMap<String, Object>() {{
+ put("_index", index);
+ put("_id", id);
+ put("_type", type);
+ }});
+ }};
+
+ return flatten(mapper.writeValueAsString(header));
+ }
+
+ protected void buildRequest(IndexOperationRequest request, StringBuilder builder) throws JsonProcessingException {
+ String header = buildBulkHeader(request);
+ builder.append(header).append("\n");
+ if (request.getOperation().equals(IndexOperationRequest.Operation.Index)) {
+ String indexDocument = mapper.writeValueAsString(request.getFields());
+ builder.append(indexDocument).append("\n");
+ } else if (request.getOperation().equals(IndexOperationRequest.Operation.Update)
+ || request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+ Map<String, Object> doc = new HashMap<String, Object>() {{
+ put("doc", request.getFields());
+ if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+ put("doc_as_upsert", true);
+ }
+ }};
+ String update = flatten(mapper.writeValueAsString(doc)).trim();
+ builder.append(String.format("%s\n", update));
}
+ }
- bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+ @Override
+ public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
+ try {
+ StringBuilder payload = new StringBuilder();
+ for (int index = 0; index < operations.size(); index++) {
+ IndexOperationRequest or = operations.get(index);
+ buildRequest(or, payload);
+ }
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(payload.toString());
+ }
+ HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
+ StopWatch watch = new StopWatch();
+ watch.start();
+ Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+ watch.stop();
+
+ String rawResponse = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(String.format("Response was: %s", rawResponse));
+ }
+
+ IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
+
+ return retVal;
+ } catch (Exception ex) {
+ throw new ElasticsearchError(ex);
+ }
+ }
- BulkResponse response = highLevelClient.bulk(bulkRequest);
- IndexOperationResponse retVal = new IndexOperationResponse(response.getTookInMillis(), response.getIngestTookInMillis());
+ @Override
+ public Long count(String query, String index, String type) {
+ Response response = runQuery("_count", query, index, type);
+ Map<String, Object> parsed = parseResponse(response);
- return retVal;
+ return ((Integer)parsed.get("count")).longValue();
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+ public DeleteOperationResponse deleteById(String index, String type, String id) {
return deleteById(index, type, Arrays.asList(id));
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
- BulkRequest bulk = new BulkRequest();
- for (int idx = 0; idx < ids.size(); idx++) {
- DeleteRequest request = new DeleteRequest(index, type, ids.get(idx));
- bulk.add(request);
+ public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ for (int idx = 0; idx < ids.size(); idx++) {
+ String header = buildBulkHeader("delete", index, type, ids.get(idx));
+ sb.append(header).append("\n");
+ }
+ HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
+ StopWatch watch = new StopWatch();
+ watch.start();
+ Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
+ watch.stop();
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(String.format("Response for bulk delete: %s",
+ IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
+ }
+
+ DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
+
+ return dor;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
- BulkResponse response = highLevelClient.bulk(bulk);
-
- DeleteOperationResponse dor = new DeleteOperationResponse(response.getTookInMillis());
-
- return dor;
}
@Override
- public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+ public DeleteOperationResponse deleteByQuery(String query, String index, String type) {
long start = System.currentTimeMillis();
Response response = runQuery("_delete_by_query", query, index, type);
long end = System.currentTimeMillis();
@@ -245,14 +325,40 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
- public Map<String, Object> get(String index, String type, String id) throws IOException {
- GetRequest get = new GetRequest(index, type, id);
- GetResponse resp = highLevelClient.get(get, new Header[]{});
- return resp.getSource();
+ public Map<String, Object> get(String index, String type, String id) {
+ try {
+ StringBuilder endpoint = new StringBuilder();
+ endpoint.append(index);
+ if (!StringUtils.isEmpty(type)) {
+ endpoint.append("/").append(type);
+ }
+ endpoint.append("/").append(id);
+ Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
+
+ String body = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
+
+ return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
+ } catch (Exception ex) {
+ getLogger().error("", ex);
+ return null;
+ }
+ }
+
+ /*
+ * In pre-7.X ElasticSearch, it should return just a number. 7.X and after they are returning a map.
+ */
+ private int handleSearchCount(Object raw) {
+ if (raw instanceof Number) {
+ return Integer.valueOf(raw.toString());
+ } else if (raw instanceof Map) {
+ return (Integer)((Map)raw).get("value");
+ } else {
+ throw new ProcessException("Unknown type for hit count.");
+ }
}
@Override
- public SearchResponse search(String query, String index, String type) throws IOException {
+ public SearchResponse search(String query, String index, String type) {
Response response = runQuery("_search", query, index, type);
Map<String, Object> parsed = parseResponse(response);
@@ -261,7 +367,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
Map<String, Object> aggregations = parsed.get("aggregations") != null
? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>();
Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits");
- int count = (Integer)hitsParent.get("total");
+ int count = handleSearchCount(hitsParent.get("total"));
List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits");
SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
new file mode 100644
index 0000000..78e641d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch
+
+import org.junit.Assert
+import org.junit.Test
+
+class SearchResponseTest {
+ @Test
+ void test() {
+ def results = []
+ def aggs = [:]
+ def num = 10
+ def took = 100
+ def timeout = false
+ def response = new SearchResponse(results, aggs, num, took, timeout)
+ def str = response.toString()
+ Assert.assertEquals(results, response.hits)
+ Assert.assertEquals(aggs, response.aggregations)
+ Assert.assertEquals(num, response.numberOfHits)
+ Assert.assertEquals(took, response.took)
+ Assert.assertEquals(timeout, response.timedOut)
+ Assert.assertTrue(str.contains("aggregations"))
+ Assert.assertTrue(str.contains("hits"))
+ Assert.assertTrue(str.contains("numberOfHits"))
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
index fd71d61..d2104aa 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
@@ -20,6 +20,8 @@ package org.apache.nifi.elasticsearch.integration
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.ssl.StandardSSLContextService
import org.apache.nifi.util.TestRunner
@@ -37,8 +39,8 @@ class ElasticSearch5ClientService_IT {
private TestRunner runner
private ElasticSearchClientServiceImpl service
- static final String INDEX = "messages"
- static final String TYPE = "message"
+ static String INDEX = "messages"
+ static String TYPE = System.getProperty("type_name")
@Before
void before() throws Exception {
@@ -80,7 +82,7 @@ class ElasticSearch5ClientService_IT {
]))
- SearchResponse response = service.search(query, "messages", "message")
+ SearchResponse response = service.search(query, "messages", TYPE)
Assert.assertNotNull("Response was null", response)
Assert.assertEquals("Wrong count", 15, response.numberOfHits)
@@ -138,6 +140,7 @@ class ElasticSearch5ClientService_IT {
@Test
void testGet() throws IOException {
Map old
+ System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + "TYPE: " + TYPE + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
1.upto(15) { index ->
String id = String.valueOf(index)
def doc = service.get(INDEX, TYPE, id)
@@ -171,4 +174,99 @@ class ElasticSearch5ClientService_IT {
runner.assertValid()
}
+
+ @Test
+ void testBulkAddTwoIndexes() throws Exception {
+ List<IndexOperationRequest> payload = new ArrayList<>()
+ for (int x = 0; x < 20; x++) {
+ String index = x % 2 == 0 ? "bulk_a": "bulk_b"
+ payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+ put("msg", "test")
+ }}, IndexOperationRequest.Operation.Index))
+ }
+ for (int x = 0; x < 5; x++) {
+ payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{
+ put("msg", "test")
+ }}, IndexOperationRequest.Operation.Index))
+ }
+ IndexOperationResponse response = service.bulk(payload)
+ Assert.assertNotNull(response)
+ Assert.assertTrue(response.getTook() > 0)
+ Thread.sleep(2000)
+
+ /*
+ * Now, check to ensure that both indexes got populated appropriately.
+ */
+ String query = "{ \"query\": { \"match_all\": {}}}"
+ Long indexA = service.count(query, "bulk_a", TYPE)
+ Long indexB = service.count(query, "bulk_b", TYPE)
+ Long indexC = service.count(query, "bulk_c", TYPE)
+
+ Assert.assertNotNull(indexA)
+ Assert.assertNotNull(indexB)
+ Assert.assertNotNull(indexC)
+ Assert.assertEquals(indexA, indexB)
+ Assert.assertEquals(10, indexA.intValue())
+ Assert.assertEquals(10, indexB.intValue())
+ Assert.assertEquals(5, indexC.intValue())
+
+ Long total = service.count(query, "bulk_*", TYPE)
+ Assert.assertNotNull(total)
+ Assert.assertEquals(25, total.intValue())
+ }
+
+ @Test
+ void testUpdateAndUpsert() {
+ final String TEST_ID = "update-test"
+ Map<String, Object> doc = new HashMap<>()
+ doc.put("msg", "Buongiorno, mondo")
+ service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index))
+ Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID)
+ Assert.assertEquals("Not the same", doc, result)
+
+ Map<String, Object> updates = new HashMap<>()
+ updates.put("from", "john.smith")
+ Map<String, Object> merged = new HashMap<>()
+ merged.putAll(updates)
+ merged.putAll(doc)
+ IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update)
+ service.add(request)
+ result = service.get(INDEX, TYPE, TEST_ID)
+ Assert.assertTrue(result.containsKey("from"))
+ Assert.assertTrue(result.containsKey("msg"))
+ Assert.assertEquals("Not the same after update.", merged, result)
+
+ final String UPSERTED_ID = "upsert-ftw"
+ Map<String, Object> upsertItems = new HashMap<>()
+ upsertItems.put("upsert_1", "hello")
+ upsertItems.put("upsert_2", 1)
+ upsertItems.put("upsert_3", true)
+ request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert)
+ service.add(request)
+ result = service.get(INDEX, TYPE, UPSERTED_ID)
+ Assert.assertEquals(upsertItems, result)
+
+ List<IndexOperationRequest> deletes = new ArrayList<>()
+ deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete))
+ deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
+ service.bulk(deletes)
+ Assert.assertNull(service.get(INDEX, TYPE, TEST_ID))
+ Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID))
+ }
+
+ @Test
+ void testGetBulkResponsesWithErrors() {
+ def ops = [
+ new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Index),
+ new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
+ new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
+ new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: "notaninteger"], IndexOperationRequest.Operation.Index)
+ ]
+ def response = service.bulk(ops)
+ assert response.hasErrors()
+ assert response.items.findAll {
+ def key = it.keySet().stream().findFirst().get()
+ it[key].containsKey("error")
+ }.size() == 2
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
index 2ce9573..f8d4ef5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
@@ -39,6 +39,8 @@ class ElasticSearchLookupService_IT {
private ElasticSearchClientService service
private ElasticSearchLookupService lookupService
+ static String TYPE = System.getProperty("type_name")
+
@Before
void before() throws Exception {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
@@ -54,7 +56,7 @@ class ElasticSearchLookupService_IT {
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
- runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "details")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
try {
runner.enableControllerService(service)
@@ -141,7 +143,7 @@ class ElasticSearchLookupService_IT {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
- runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService)
Optional<Record> response = lookupService.lookup(coordinates)
@@ -162,7 +164,7 @@ class ElasticSearchLookupService_IT {
void testDetectedSchema() throws LookupFailureException {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
- runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "complex")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService)
def coordinates = ["_id": "1" ]
@@ -196,7 +198,7 @@ class ElasticSearchLookupService_IT {
runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
- runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService)
def coordinates = ["msg": "Hello, world"]
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
index 899cedb..6dfc5e4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
@@ -33,37 +33,42 @@ class TestElasticSearchClientService extends AbstractControllerService implement
]
@Override
- IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
+ IndexOperationResponse add(IndexOperationRequest operation) {
return null
}
@Override
- IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
+ IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
return null
}
@Override
- DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+ Long count(String query, String index, String type) {
return null
}
@Override
- DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, String id) {
return null
}
@Override
- DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
return null
}
@Override
- Map<String, Object> get(String index, String type, String id) throws IOException {
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+ return null
+ }
+
+ @Override
+ Map<String, Object> get(String index, String type, String id) {
return data
}
@Override
- SearchResponse search(String query, String index, String type) throws IOException {
+ SearchResponse search(String query, String index, String type) {
List hits = [[
"_source": data
]]
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script
new file mode 100644
index 0000000..bac18d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{"faketype":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
+PUT:messages/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:complex/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
+PUT:nested/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_b/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_c/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:error_handler:{ "mappings": { "faketype": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+#add document
+PUT:messages/faketype/1:{ "msg":"one" }
+PUT:messages/faketype/2:{ "msg":"two" }
+PUT:messages/faketype/3:{ "msg":"two" }
+PUT:messages/faketype/4:{ "msg":"three" }
+PUT:messages/faketype/5:{ "msg":"three" }
+PUT:messages/faketype/6:{ "msg":"three" }
+PUT:messages/faketype/7:{ "msg":"four" }
+PUT:messages/faketype/8:{ "msg":"four" }
+PUT:messages/faketype/9:{ "msg":"four" }
+PUT:messages/faketype/10:{ "msg":"four" }
+PUT:messages/faketype/11:{ "msg":"five" }
+PUT:messages/faketype/12:{ "msg":"five" }
+PUT:messages/faketype/13:{ "msg":"five" }
+PUT:messages/faketype/14:{ "msg":"five" }
+PUT:messages/faketype/15:{ "msg":"five" }
+PUT:complex/faketype/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+PUT:user_details/faketype/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+PUT:user_details/faketype/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+PUT:nested/faketype/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+PUT:nested/faketype/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
new file mode 100644
index 0000000..703ec18
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{"_doc":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
+PUT:messages/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:complex/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
+PUT:nested/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
+PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
+
+#add document
+PUT:messages/_doc/1:{ "msg":"one" }
+PUT:messages/_doc/2:{ "msg":"two" }
+PUT:messages/_doc/3:{ "msg":"two" }
+PUT:messages/_doc/4:{ "msg":"three" }
+PUT:messages/_doc/5:{ "msg":"three" }
+PUT:messages/_doc/6:{ "msg":"three" }
+PUT:messages/_doc/7:{ "msg":"four" }
+PUT:messages/_doc/8:{ "msg":"four" }
+PUT:messages/_doc/9:{ "msg":"four" }
+PUT:messages/_doc/10:{ "msg":"four" }
+PUT:messages/_doc/11:{ "msg":"five" }
+PUT:messages/_doc/12:{ "msg":"five" }
+PUT:messages/_doc/13:{ "msg":"five" }
+PUT:messages/_doc/14:{ "msg":"five" }
+PUT:messages/_doc/15:{ "msg":"five" }
+PUT:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+PUT:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
new file mode 100644
index 0000000..0b240eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#create mapping
+PUT:user_details/:{ "mappings":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}
+PUT:messages/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:complex/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}
+PUT:nested/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}
+PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
+PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
+#add document
+POST:messages/_doc/1:{ "msg":"one" }
+POST:messages/_doc/2:{ "msg":"two" }
+POST:messages/_doc/3:{ "msg":"two" }
+POST:messages/_doc/4:{ "msg":"three" }
+POST:messages/_doc/5:{ "msg":"three" }
+POST:messages/_doc/6:{ "msg":"three" }
+POST:messages/_doc/7:{ "msg":"four" }
+POST:messages/_doc/8:{ "msg":"four" }
+POST:messages/_doc/9:{ "msg":"four" }
+POST:messages/_doc/10:{ "msg":"four" }
+POST:messages/_doc/11:{ "msg":"five" }
+POST:messages/_doc/12:{ "msg":"five" }
+POST:messages/_doc/13:{ "msg":"five" }
+POST:messages/_doc/14:{ "msg":"five" }
+POST:messages/_doc/15:{ "msg":"five" }
+POST:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
+POST:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
+POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
+POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
+POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
+POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
deleted file mode 100644
index a17a039..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#create mapping
-PUT:user_details/:{ "mappings":{"details":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
-PUT:messages/:{ "mappings":{"message":{ "properties":{ "msg":{"type":"keyword"}}}}}
-PUT:complex/:{"mappings":{"complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
-PUT:nested/:{"mappings":{"nested_complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
-#add document
-PUT:messages/message/1:{ "msg":"one" }
-PUT:messages/message/2:{ "msg":"two" }
-PUT:messages/message/3:{ "msg":"two" }
-PUT:messages/message/4:{ "msg":"three" }
-PUT:messages/message/5:{ "msg":"three" }
-PUT:messages/message/6:{ "msg":"three" }
-PUT:messages/message/7:{ "msg":"four" }
-PUT:messages/message/8:{ "msg":"four" }
-PUT:messages/message/9:{ "msg":"four" }
-PUT:messages/message/10:{ "msg":"four" }
-PUT:messages/message/11:{ "msg":"five" }
-PUT:messages/message/12:{ "msg":"five" }
-PUT:messages/message/13:{ "msg":"five" }
-PUT:messages/message/14:{ "msg":"five" }
-PUT:messages/message/15:{ "msg":"five" }
-PUT:complex/complex/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
-PUT:user_details/details/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
-PUT:user_details/details/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
-PUT:nested/nested_complex/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
-PUT:nested/nested_complex/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
-PUT:nested/nested_complex/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index 8270d0b..5d7ef89 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -57,6 +57,11 @@ language governing permissions and limitations under the License. -->
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
@@ -94,6 +99,46 @@ language governing permissions and limitations under the License. -->
<version>1.11.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.4.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-json</artifactId>
+ <version>${nifi.groovy.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
index 609d8ed..f191778 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -43,14 +43,14 @@ import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@CapabilityDescription("Delete from an ElasticSearch index using a query. The query can be loaded from a flowfile body " +
+@CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@Tags({ "elastic", "elasticsearch", "delete", "query"})
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
- @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by ElasticSearch if there is an error running the delete.")
+ @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor {
+public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
similarity index 67%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index ccf04ca..a52d2ca 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -25,12 +25,15 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-public interface ElasticSearchRestProcessor {
+public interface ElasticsearchRestProcessor {
+ String ATTR_RECORD_COUNT = "record.count";
+
PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("el-rest-fetch-index")
.displayName("Index")
@@ -43,7 +46,7 @@ public interface ElasticSearchRestProcessor {
PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("el-rest-type")
.displayName("Type")
- .description("The type of this document (used by Elasticsearch for indexing and searching)")
+ .description("The type of this document (used by Elasticsearch for indexing and searching).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -70,11 +73,39 @@ public interface ElasticSearchRestProcessor {
PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("el-rest-client-service")
.displayName("Client Service")
- .description("An ElasticSearch client service to use for running queries.")
+ .description("An Elasticsearch client service to use for running queries.")
.identifiesControllerService(ElasticSearchClientService.class)
.required(true)
.build();
+ PropertyDescriptor LOG_ERROR_RESPONSES = new PropertyDescriptor.Builder()
+ .name("put-es-record-log-error-responses")
+ .displayName("Log Error Responses")
+ .description("If this is enabled, errors will be logged to the NiFi logs at the error log level. Otherwise, they will " +
+ "only be logged if debug logging is enabled on NiFi as a whole. The purpose of this option is to give the user " +
+ "the ability to debug failed operations without having to turn on debug logging.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("All flowfiles that fail for reasons unrelated to server availability go to this relationship.")
+ .build();
+ Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("All flowfiles that fail due to server/cluster availability go to this relationship.")
+ .build();
+ Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All flowfiles that succeed in being transferred into Elasticsearch go here.")
+ .build();
+ Relationship REL_FAILED_RECORDS = new Relationship.Builder()
+ .name("errors").description("If an output record write is set, any record that failed to process the way it was " +
+ "configured will be sent to this relationship as part of a failed record record set.")
+ .autoTerminateDefault(true).build();
+
default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
String retVal = null;
if (context.getProperty(QUERY).isSet()) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index 40d6118..903a124 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -56,18 +56,15 @@ import java.util.Set;
@EventDriven
@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " +
- "ElasticSearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
+ "Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
- "from ElasticSearch will be loaded into memory all at once and converted into the resulting flowfiles.")
-public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor {
+ "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("All original flowfiles that don't cause an error to occur go to this relationship. " +
"This applies even if you select the \"split up hits\" option to send individual hits to the " +
"\"hits\" relationship.").build();
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
-
public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
.description("Search hits are routed to this relationship.")
.build();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
new file mode 100644
index 0000000..bbc86e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"})
+@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
+public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("put-es-record-reader")
+ .displayName("Record Reader")
+ .description("The record reader to use for reading incoming records from flowfiles.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("put-es-record-batch-size")
+ .displayName("Batch Size")
+ .description("The number of records to send over in a single batch.")
+ .defaultValue("100")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-es-record-id-path")
+ .displayName("ID Record Path")
+ .description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
+ "the ID will be automatically generated by Elasticsearch.")
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor INDEX_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-es-record-index-record-path")
+ .displayName("Index Record Path")
+ .description("A record path expression to retrieve the index field for use with Elasticsearch. If left blank " +
+ "the index will be determined using the main index property.")
+ .addValidator(new RecordPathValidator())
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor TYPE_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-es-record-type-record-path")
+ .displayName("Type Record Path")
+ .description("A record path expression to retrieve the type field for use with Elasticsearch. If left blank " +
+ "the type will be determined using the main type property.")
+ .addValidator(new RecordPathValidator())
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("put-es-record-error-writer")
+ .displayName("Error Record Writer")
+ .description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
+ "and the failed records will be written to a record set with this record writer service and sent to the \"errors\" " +
+ "relationship.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .addValidator(Validator.VALID)
+ .required(false)
+ .build();
+
+ static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
+ LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
+ ));
+ static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
+ )));
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ private RecordReaderFactory readerFactory;
+ private RecordPathCache recordPathCache;
+ private ElasticSearchClientService clientService;
+ private RecordSetWriterFactory writerFactory;
+ private boolean logErrors;
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+ this.recordPathCache = new RecordPathCache(16);
+ this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+ final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
+ ? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
+ final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
+ ? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
+ final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
+ ? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
+ : null;
+
+ RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
+ RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
+ RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
+
+ int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
+ List<FlowFile> badRecords = new ArrayList<>();
+
+ try (final InputStream inStream = session.read(input);
+ final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
+ Record record;
+ List<IndexOperationRequest> operationList = new ArrayList<>();
+ List<Record> originals = new ArrayList<>();
+
+ while ((record = reader.nextRecord()) != null) {
+ final String idx = getFromRecordPath(record, iPath, index);
+ final String t = getFromRecordPath(record, tPath, type);
+ final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
+ final String id = path != null ? getFromRecordPath(record, path, null) : null;
+
+ Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+
+ removeEmpty(contentMap);
+
+ operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
+ originals.add(record);
+
+ if (operationList.size() == batchSize) {
+ BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+ FlowFile bad = indexDocuments(bundle, session, input);
+ if (bad != null) {
+ badRecords.add(bad);
+ }
+
+ operationList.clear();
+ originals.clear();
+ }
+ }
+
+ if (operationList.size() > 0) {
+ BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
+ FlowFile bad = indexDocuments(bundle, session, input);
+ if (bad != null) {
+ badRecords.add(bad);
+ }
+ }
+
+ session.transfer(input, REL_SUCCESS);
+ } catch (ElasticsearchError ese) {
+ String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
+ ese.isElastic() ? "Moving to retry." : "Moving to failure");
+ getLogger().error(msg, ese);
+ Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
+ session.penalize(input);
+ session.transfer(input, rel);
+ removeBadRecordFlowFiles(badRecords, session);
+ } catch (Exception ex) {
+ getLogger().error("Could not index documents.", ex);
+ session.transfer(input, REL_FAILURE);
+ removeBadRecordFlowFiles(badRecords, session);
+ }
+ }
+
+ private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
+ for (FlowFile badFlowFile : bad) {
+ session.remove(badFlowFile);
+ }
+
+ bad.clear();
+ }
+
+ private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
+ IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
+ if (response.hasErrors()) {
+ if(logErrors || getLogger().isDebugEnabled()) {
+ List<Map<String, Object>> errors = response.getItems();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
+
+ if (logErrors) {
+ getLogger().error(output);
+ } else {
+ getLogger().debug(output);
+ }
+ }
+
+ if (writerFactory != null) {
+ FlowFile errorFF = session.create(input);
+ try (OutputStream os = session.write(errorFF);
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
+
+ int added = 0;
+ writer.beginRecordSet();
+ for (int index = 0; index < response.getItems().size(); index++) {
+ Map<String, Object> current = response.getItems().get(index);
+ String key = current.keySet().stream().findFirst().get();
+ Map<String, Object> inner = (Map<String, Object>) current.get(key);
+ if (inner.containsKey("error")) {
+ writer.write(bundle.getOriginalRecords().get(index));
+ added++;
+ }
+ }
+ writer.finishRecordSet();
+ writer.close();
+ os.close();
+
+ errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
+
+ session.transfer(errorFF, REL_FAILED_RECORDS);
+
+ return errorFF;
+ } catch (Exception ex) {
+ getLogger().error("", ex);
+ session.remove(errorFF);
+ throw ex;
+ }
+ }
+
+ return null;
+ } else {
+ return null;
+ }
+ }
+
+ private void removeEmpty(Map<String, Object> input) {
+ Map<String, Object> copy = new HashMap<>(input);
+
+ for (Map.Entry<String, Object> entry : input.entrySet()) {
+ if (entry.getValue() == null) {
+ copy.remove(entry.getKey());
+ } else {
+ if (StringUtils.isBlank(entry.getValue().toString())) {
+ copy.remove(entry.getKey());
+ } else if (entry.getValue() instanceof Map) {
+ removeEmpty((Map<String, Object>) entry.getValue());
+ } else if (entry.getValue() instanceof List) {
+ for (Object value : (List)entry.getValue()) {
+ if (value instanceof Map) {
+ removeEmpty((Map<String, Object>) value);
+ }
+ }
+ }
+ }
+ }
+
+ input.clear();
+ input.putAll(copy);
+ }
+
+ private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
+ if (path == null) {
+ return fallback;
+ }
+
+ RecordPathResult result = path.evaluate(record);
+ Optional<FieldValue> value = result.getSelectedFields().findFirst();
+ if (value.isPresent() && value.get().getValue() != null) {
+ FieldValue fieldValue = value.get();
+ if (!fieldValue.getField().getDataType().getFieldType().equals(RecordFieldType.STRING) ) {
+ throw new ProcessException(
+ String.format("Field referenced by %s must be a string.", path.getPath())
+ );
+ }
+
+ fieldValue.updateValue(null);
+
+ String retVal = fieldValue.getValue().toString();
+
+ return retVal;
+ } else {
+ return fallback;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
new file mode 100644
index 0000000..f54aa60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/BulkOperation.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.List;
+
+public class BulkOperation {
+ private List<IndexOperationRequest> operationList;
+ private List<Record> originalRecords;
+ private RecordSchema schema;
+
+ public BulkOperation(List<IndexOperationRequest> operationList, List<Record> originalRecords, RecordSchema schema) {
+ this.operationList = operationList;
+ this.originalRecords = originalRecords;
+ this.schema = schema;
+ }
+
+ public List<IndexOperationRequest> getOperationList() {
+ return operationList;
+ }
+
+ public List<Record> getOriginalRecords() {
+ return originalRecords;
+ }
+
+ public RecordSchema getSchema() {
+ return schema;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
similarity index 57%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
index 9281adb..0312e11 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/FlowFileJsonDescription.java
@@ -15,21 +15,28 @@
* limitations under the License.
*/
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.put;
-import java.util.Map;
+import com.jayway.jsonpath.JsonPath;
-public class IndexOperationRequest {
+public class FlowFileJsonDescription {
private String index;
private String type;
private String id;
- private Map<String, Object> fields;
+ private String content;
- public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) {
+ private JsonPath idJsonPath;
+ private JsonPath typeJsonPath;
+ private JsonPath indexJsonPath;
+
+ public FlowFileJsonDescription(String index, String type, String id, String content, JsonPath idJsonPath, JsonPath typeJsonPath, JsonPath indexJsonPath) {
this.index = index;
this.type = type;
this.id = id;
- this.fields = fields;
+ this.content = content;
+ this.idJsonPath = idJsonPath;
+ this.typeJsonPath = typeJsonPath;
+ this.indexJsonPath = indexJsonPath;
}
public String getIndex() {
@@ -44,7 +51,19 @@ public class IndexOperationRequest {
return id;
}
- public Map<String, Object> getFields() {
- return fields;
+ public String getContent() {
+ return content;
+ }
+
+ public JsonPath getIdJsonPath() {
+ return idJsonPath;
+ }
+
+ public JsonPath getTypeJsonPath() {
+ return typeJsonPath;
+ }
+
+ public JsonPath getIndexJsonPath() {
+ return indexJsonPath;
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
similarity index 68%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
index a22b7aa..2d7a508 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/put/JsonProcessingError.java
@@ -15,22 +15,10 @@
* limitations under the License.
*/
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.put;
-public class IndexOperationResponse {
- private long took;
- private long ingestTook;
-
- public IndexOperationResponse(long took, long ingestTook) {
- this.took = took;
- this.ingestTook = ingestTook;
- }
-
- public long getTook() {
- return took;
- }
-
- public long getIngestTook() {
- return ingestTook;
+public class JsonProcessingError extends Exception {
+ public JsonProcessingError(String message) {
+ super(message);
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
new file mode 100644
index 0000000..2034d64
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PutElasticsearchJson</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+ <p>This processor is designed to take any valid JSON and index it into Elasticsearch 5.x or newer. Unlike the record-aware
+ alternative, it only does index operations, not updates, upserts or deletes.</p>
+ <p>The ID for each document can be set in one of three ways:</p>
+ <ul>
+ <li>FlowFile attribute - only possible if the flowfile contains only a single document. Arrays of documents
+ will cause an error to be raised.</li>
+ <li>JsonPath operation - searches within the document for a particular field.</li>
+ <li>Default - Elasticsearch will generate one. This will be used when neither the attribute nor JsonPath configuration
+ fields are used.</li>
+ </ul>
+ <p>Example document:</p>
+ <pre>
+ {
+ "index": "messages",
+ "type": "message",
+ "message": "Hello, world",
+ "from": "john.smith"
+ }
+ </pre>
+ <p>The following JsonPath operations will extract the index and type:</p>
+ <ul>
+ <li>$.index</li>
+ <li>$.type</li>
+ </ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
new file mode 100644
index 0000000..9206785
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PutElasticsearchRecord</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+ <p>This is a record-aware processor built for Elasticsearch 5 and later. The index and type fields are configured
+ with default values that can be overridden using record path operations that find an index or type value in the
+ record set. The ID and operation type (index, update, upsert or delete) can also be extracted in a similar fashion from
+ the record set. The following is an example of a document exercising all of these features:</p>
+ <pre>
+ {
+ "metadata": {
+ "id": "12345",
+ "index": "test",
+ "type": "message",
+ "operation": "index"
+ },
+ "message": "Hello, world",
+ "from": "john.smith"
+ }
+ </pre>
+ <pre>
+ {
+ "metadata": {
+ "id": "12345",
+ "index": "test",
+ "type": "message",
+ "operation": "delete"
+ }
+ }
+ </pre>
+ <p>The record path operations below would extract the relevant data:</p>
+ <ul>
+ <li>/metadata/id</li>
+ <li>/metadata/index</li>
+ <li>metadata/type</li>
+ <li>metadata/operation</li>
+ </ul>
+ <p>Valid values for "operation" are:</p>
+ <ul>
+ <li>delete</li>
+ <li>index</li>
+ <li>update</li>
+ <li>upsert</li>
+ </ul>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ac8c915..84e00b4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
+org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
index 3a5331f..05c19ad 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch/additionalDetails.html
@@ -21,7 +21,7 @@
</head>
<body>
<p>This processor executes a delete operation against one or more indices using the _delete_by_query handler. The
- query should be a valid ElasticSearch JSON DSL query (Lucene syntax is not supported). An example query:</p>
+ query should be a valid Elasticsearch JSON DSL query (Lucene syntax is not supported). An example query:</p>
<pre>
{
"query": {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
index 3430cde..52f5f3d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch/additionalDetails.html
@@ -20,8 +20,8 @@
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
- <p>This processor is intended for use with the ElasticSearch JSON DSL and ElasticSearch 5.X and newer. It is designed
- to be able to take a query from Kibana and execute it as-is against an ElasticSearch cluster. Like all processors in the
+ <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+ to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
"restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
<p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
new file mode 100644
index 0000000..0ac19b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -0,0 +1,41 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PutElasticsearchRecord</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+<p>
+ This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
+ a per-record basis which is what separates it from PutElasticsearchHttpRecord. For example, it is possible to define
+ multiple commands to index documents, followed by deletes, creates and update operations against the same index or
+ other indices as desired.
+</p>
+<p>
+ As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and
+ that controller service is built on top of the official Elasticsearch client APIs. That provides features such as
+ automatic master detection against the cluster which is missing in the other bundles.
+</p>
+<p>
+ This processor builds one Elasticsearch Bulk API body per record set. Care should be taken to split up record sets
+ into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are
+ not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records
+ that failed to an output record writer so that only failed records can be processed downstream or replayed.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
new file mode 100644
index 0000000..8e913ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.groovy
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+class DeleteByQueryElasticsearchTest {
+ private static final String INDEX = "test_idx"
+ private static final String TYPE = "test_type"
+ private static final String QUERY_ATTR = "es.delete.query"
+ private static final String CLIENT_NAME = "clientService"
+
+ private TestElasticsearchClientService client
+
+ private void initClient(TestRunner runner) throws Exception {
+ client = new TestElasticsearchClientService(true)
+ runner.addControllerService(CLIENT_NAME, client)
+ runner.enableControllerService(client)
+ runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
+ }
+
+ private void postTest(TestRunner runner, String queryParam) {
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS)
+ String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE)
+ String query = flowFiles.get(0).getAttribute(QUERY_ATTR)
+ Assert.assertNotNull(attr)
+ Assert.assertEquals(attr, "100")
+ Assert.assertNotNull(query)
+ Assert.assertEquals(queryParam, query)
+ }
+
+ @Test
+ void testWithFlowfileInput() throws Exception {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.enqueue(query)
+ runner.run()
+
+ postTest(runner, query)
+ }
+
+ @Test
+ void testWithQuery() throws Exception {
+ String query = "{\n" +
+ "\t\"query\": {\n" +
+ "\t\t\"match\": {\n" +
+ "\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
+ "\t\t}\n" +
+ "\t}\n" +
+ "}"
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("field.name", "test_field")
+ }}
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.enqueue("", attrs)
+ runner.run()
+
+ postTest(runner, query.replace('${field.name}', "test_field"))
+
+ runner.clearTransferState()
+
+ query = "{\n" +
+ "\t\"query\": {\n" +
+ "\t\t\"match\": {\n" +
+ "\t\t\t\"test_field.keyword\": \"test\"\n" +
+ "\t\t}\n" +
+ "\t}\n" +
+ "}"
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setIncomingConnection(false)
+ runner.assertValid()
+ runner.run()
+ postTest(runner, query)
+ }
+
+ @Test
+ void testErrorAttribute() throws Exception {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ client.setThrowErrorInDelete(true)
+ runner.assertValid()
+ runner.enqueue("")
+ runner.run()
+
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0)
+ runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1)
+
+ MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0)
+ String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE)
+ Assert.assertNotNull(attr)
+ }
+
+ @Test
+ void testInputHandling() {
+ String query = "{ \"query\": { \"match_all\": {} }}"
+ TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
+ runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
+ runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
+ runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
+ initClient(runner)
+ runner.assertValid()
+ runner.run()
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
similarity index 50%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
index cd7ac19..33a26de 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.groovy
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -15,54 +15,54 @@
* limitations under the License.
*/
-package org.apache.nifi.processors.elasticsearch;
+package org.apache.nifi.processors.elasticsearch
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
-import java.util.List;
+import java.util.List
-public class JsonQueryElasticsearchTest {
- private static final String INDEX_NAME = "messages";
+class JsonQueryElasticsearchTest {
+ private static final String INDEX_NAME = "messages"
- public void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
- runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure);
- runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations);
+ void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
}
@Test
- public void testBasicQuery() throws Exception {
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(false);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}");
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 0);
-
- runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES);
- runner.clearProvenanceEvents();
- runner.clearTransferState();
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 10, 0, 0);
+ void testBasicQuery() throws Exception {
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 0)
+
+ runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES)
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 10, 0, 0)
}
@Test
- public void testAggregations() throws Exception {
+ void testAggregations() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -79,42 +79,42 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
+ "}"
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 1);
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 1)
- runner.clearTransferState();
+ runner.clearTransferState()
//Test with the query parameter and no incoming connection
- runner.setIncomingConnection(false);
- runner.run(1, true, true);
- testCounts(runner, 0, 1, 0, 1);
- runner.setIncomingConnection(true);
+ runner.setIncomingConnection(false)
+ runner.run(1, true, true)
+ testCounts(runner, 0, 1, 0, 1)
+ runner.setIncomingConnection(true)
- runner.clearTransferState();
- runner.clearProvenanceEvents();
- runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES);
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 2);
+ runner.clearTransferState()
+ runner.clearProvenanceEvents()
+ runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES)
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 2)
- runner.clearProvenanceEvents();
- runner.clearTransferState();
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
query = "{\n" +
"\t\"query\": {\n" +
@@ -123,30 +123,30 @@ public class JsonQueryElasticsearchTest {
"\t\"aggs\": {\n" +
"\t\t\"test_agg\": {\n" +
"\t\t\t\"terms\": {\n" +
- "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+ "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" +
"\t\t},\n" +
"\t\t\"test_agg2\": {\n" +
"\t\t\t\"terms\": {\n" +
- "\t\t\t\t\"field\": \"${fieldValue}\"\n" +
+ "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
- runner.setVariable("fieldValue", "msg");
- runner.setVariable("es.index", INDEX_NAME);
- runner.setVariable("es.type", "msg");
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
- runner.setProperty(JsonQueryElasticsearch.INDEX, "${es.index}");
- runner.setProperty(JsonQueryElasticsearch.TYPE, "${es.type}");
- runner.setValidateExpressionUsage(true);
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 2);
+ "}"
+ runner.setVariable("fieldValue", "msg")
+ runner.setVariable("es.index", INDEX_NAME)
+ runner.setVariable("es.type", "msg")
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+ runner.setProperty(JsonQueryElasticsearch.INDEX, "\${es.index}")
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "\${es.type}")
+ runner.setValidateExpressionUsage(true)
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 2)
}
@Test
- public void testErrorDuringSearch() throws Exception {
+ void testErrorDuringSearch() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -163,28 +163,28 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
-
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- service.setThrowErrorInSearch(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 0, 0, 1, 0);
+ "}"
+
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ service.setThrowErrorInSearch(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 0, 0, 1, 0)
}
@Test
- public void testQueryAttribute() throws Exception {
+ void testQueryAttribute() throws Exception {
final String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" +
@@ -201,32 +201,52 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" +
"\t\t}\n" +
"\t}\n" +
- "}";
- final String queryAttr = "es.query";
-
-
- JsonQueryElasticsearch processor = new JsonQueryElasticsearch();
- TestRunner runner = TestRunners.newTestRunner(processor);
- TestElasticSearchClientService service = new TestElasticSearchClientService(true);
- runner.addControllerService("esService", service);
- runner.enableControllerService(service);
- runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService");
- runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME);
- runner.setProperty(JsonQueryElasticsearch.TYPE, "message");
- runner.setValidateExpressionUsage(true);
- runner.setProperty(JsonQueryElasticsearch.QUERY, query);
- runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr);
-
- runner.enqueue("test");
- runner.run(1, true, true);
- testCounts(runner, 1, 1, 0, 1);
- List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS);
- flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS));
+ "}"
+ final String queryAttr = "es.query"
+
+
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(true)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, query)
+ runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
+
+ runner.enqueue("test")
+ runner.run(1, true, true)
+ testCounts(runner, 1, 1, 0, 1)
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS)
+ flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS))
for (MockFlowFile mockFlowFile : flowFiles) {
- String attr = mockFlowFile.getAttribute(queryAttr);
- Assert.assertNotNull("Missing query attribute", attr);
- Assert.assertEquals("Query had wrong value.", query, attr);
+ String attr = mockFlowFile.getAttribute(queryAttr)
+ Assert.assertNotNull("Missing query attribute", attr)
+ Assert.assertEquals("Query had wrong value.", query, attr)
}
}
+
+ @Test
+ void testInputHandling() {
+ JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
+ TestRunner runner = TestRunners.newTestRunner(processor)
+ TestElasticsearchClientService service = new TestElasticsearchClientService(false)
+ runner.addControllerService("esService", service)
+ runner.enableControllerService(service)
+ runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
+ runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
+ runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
+ runner.setValidateExpressionUsage(true)
+ runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
+
+ runner.run()
+
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_SUCCESS, 0)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, 0)
+ runner.assertTransferCount(JsonQueryElasticsearch.REL_RETRY, 0)
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
new file mode 100644
index 0000000..62f7c1b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.avro.Schema
+import org.apache.nifi.avro.AvroTypeUtil
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.json.JsonRecordSetWriter
+import org.apache.nifi.json.JsonTreeReader
+import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+class PutElasticsearchRecordTest {
+ MockBulkLoadClientService clientService
+ MockSchemaRegistry registry
+ JsonTreeReader reader
+ TestRunner runner
+
+ static final String SCHEMA = prettyPrint(toJson([
+ name: "TestSchema",
+ type: "record",
+ fields: [
+ [ name: "msg", type: "string" ],
+ [ name: "from", type: "string" ]
+ ]
+ ]))
+
+ static final String flowFileContents = prettyPrint(toJson([
+ [ msg: "Hello, world", from: "john.smith" ],
+ [ msg: "Hi, back at ya!", from: "jane.doe" ]
+ ]))
+
+ @Before
+ void setup() {
+ clientService = new MockBulkLoadClientService()
+ registry = new MockSchemaRegistry()
+ reader = new JsonTreeReader()
+ runner = TestRunners.newTestRunner(PutElasticsearchRecord.class)
+
+ registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA)))
+
+ clientService.response = new IndexOperationResponse(1500)
+
+ runner.addControllerService("registry", registry)
+ runner.addControllerService("reader", reader)
+ runner.addControllerService("clientService", clientService)
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
+ runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
+ runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
+ runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
+ runner.enableControllerService(registry)
+ runner.enableControllerService(reader)
+ runner.enableControllerService(clientService)
+
+ runner.assertValid()
+ }
+
+ void basicTest(int failure, int retry, int success) {
+ runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
+ }
+
+ @Test
+ void simpleTest() {
+ basicTest(0, 0, 1)
+ }
+
+ @Test
+ void testFatalError() {
+ clientService.throwFatalError = true
+ basicTest(1, 0, 0)
+ }
+
+ @Test
+ void testRetriable() {
+ clientService.throwRetriableError = true
+ basicTest(0, 1, 0)
+ }
+
+ @Test
+ void testRecordPathFeatures() {
+ def newSchema = prettyPrint(toJson([
+ type: "record",
+ name: "RecordPathTestType",
+ fields: [
+ [ name: "id", type: "string" ],
+ [ name: "index", type: "string" ],
+ [ name: "type", type: "string" ],
+ [ name: "msg", type: "string"]
+ ]
+ ]))
+
+ def flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+ ]))
+
+ def evalClosure = { List<IndexOperationRequest> items ->
+ def a = items.findAll { it.index == "bulk_a" }.size()
+ def b = items.findAll { it.index == "bulk_b" }.size()
+ items.each {
+ Assert.assertNotNull(it.id)
+ Assert.assertTrue(it.id.startsWith("rec-"))
+ Assert.assertEquals("message", it.type)
+ }
+ Assert.assertEquals(3, a)
+ Assert.assertEquals(3, b)
+ }
+
+ clientService.evalClosure = evalClosure
+
+ registry.addSchema("recordPathTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
+ runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
+ runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+ runner.clearTransferState()
+
+ flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-2", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-3", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-4", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-5", index: null, type: null, msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+ ]))
+
+ evalClosure = { List<IndexOperationRequest> items ->
+ def testTypeCount = items.findAll { it.type == "test_type" }.size()
+ def messageTypeCount = items.findAll { it.type == "message" }.size()
+ def testIndexCount = items.findAll { it.index == "test_index" }.size()
+ def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
+ def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+ Assert.assertEquals(5, testTypeCount)
+ Assert.assertEquals(1, messageTypeCount)
+ Assert.assertEquals(5, testIndexCount)
+ Assert.assertEquals(1, bulkIndexCount)
+ Assert.assertEquals(6, indexOperationCount)
+ }
+
+ clientService.evalClosure = evalClosure
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+
+ runner.clearTransferState()
+
+ flowFileContents = prettyPrint(toJson([
+ [ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
+ [ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
+ [ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
+ ]))
+
+ clientService.evalClosure = { List<IndexOperationRequest> items ->
+ int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
+ Assert.assertEquals(6, index)
+ }
+
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+ }
+
+ @Test
+ void testInputRequired() {
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
+ }
+
+ @Test
+ void testErrorRelationship() {
+ def writer = new JsonRecordSetWriter()
+ runner.addControllerService("writer", writer)
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.enableControllerService(writer)
+ runner.setProperty(PutElasticsearchRecord.ERROR_RECORD_WRITER, "writer")
+
+ def newSchema = prettyPrint(toJson([
+ type: "record",
+ name: "RecordPathTestType",
+ fields: [
+ [ name: "id", type: "string" ],
+ [ name: "field1", type: ["null", "string"]],
+ [ name: "field2", type: "string"]
+ ]
+ ]))
+
+ def values = [
+ [ id: "1", field1: 'value1', field2: '20' ],
+ [ id: "2", field1: 'value1', field2: '20' ],
+ [ id: "2", field1: 'value1', field2: '20' ],
+ [ id: "3", field1: 'value1', field2: '20abcd' ]
+ ]
+
+ clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+ registry.addSchema("errorTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
+ runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ])
+ runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
+ runner.assertValid()
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
+
+ def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+ assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
similarity index 70%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
index b1d4220..b1d559b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy
@@ -15,82 +15,79 @@
* limitations under the License.
*/
-package org.apache.nifi.processors.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.IndexOperationRequest;
-import org.apache.nifi.elasticsearch.IndexOperationResponse;
-import org.apache.nifi.elasticsearch.SearchResponse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
- private boolean returnAggs;
- private boolean throwErrorInSearch;
- private boolean throwErrorInDelete;
-
- public TestElasticSearchClientService(boolean returnAggs) {
- this.returnAggs = returnAggs;
+package org.apache.nifi.processors.elasticsearch
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
+
+class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
+ private boolean returnAggs
+ private boolean throwErrorInSearch
+ private boolean throwErrorInDelete
+
+ TestElasticsearchClientService(boolean returnAggs) {
+ this.returnAggs = returnAggs
}
@Override
- public IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
- return add(Arrays.asList(operation));
+ IndexOperationResponse add(IndexOperationRequest operation) {
+ return add(Arrays.asList(operation))
}
@Override
- public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
- return new IndexOperationResponse(100L, 100L);
+ IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
+ return new IndexOperationResponse(100L, 100L)
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
- return deleteById(index, type, Arrays.asList(id));
+ Long count(String query, String index, String type) {
+ return null
}
@Override
- public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, String id) {
+ return deleteById(index, type, Arrays.asList(id))
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
if (throwErrorInDelete) {
- throw new IOException("Simulated IOException");
+ throw new IOException("Simulated IOException")
}
- return new DeleteOperationResponse(100L);
+ return new DeleteOperationResponse(100L)
}
@Override
- public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
- return deleteById(index, type, Arrays.asList("1"));
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+ return deleteById(index, type, Arrays.asList("1"))
}
@Override
- public Map<String, Object> get(String index, String type, String id) throws IOException {
- return new HashMap<String, Object>(){{
- put("msg", "one");
- }};
+ Map<String, Object> get(String index, String type, String id) {
+ return [ "msg": "one" ]
}
@Override
- public SearchResponse search(String query, String index, String type) throws IOException {
+ SearchResponse search(String query, String index, String type) {
if (throwErrorInSearch) {
- throw new IOException("Simulated IOException");
+ throw new IOException("Simulated IOException")
}
- ObjectMapper mapper = new ObjectMapper();
- List<Map<String, Object>> hits = (List<Map<String, Object>>)mapper.readValue(HITS_RESULT, List.class);
- Map<String, Object> aggs = returnAggs ? (Map<String, Object>)mapper.readValue(AGGS_RESULT, Map.class) : null;
- SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false);
- return response;
+ def mapper = new JsonSlurper()
+ def hits = mapper.parseText(HITS_RESULT)
+ def aggs = returnAggs ? mapper.parseText(AGGS_RESULT) : null
+ SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false)
+ return response
}
@Override
- public String getTransitUrl(String index, String type) {
- return String.format("http://localhost:9400/%s/%s", index, type);
+ String getTransitUrl(String index, String type) {
+ "http://localhost:9400/${index}/${type}"
}
private static final String AGGS_RESULT = "{\n" +
@@ -146,7 +143,7 @@ public class TestElasticSearchClientService extends AbstractControllerService im
" }\n" +
" ]\n" +
" }\n" +
- " }";
+ " }"
private static final String HITS_RESULT = "[\n" +
" {\n" +
@@ -239,13 +236,13 @@ public class TestElasticSearchClientService extends AbstractControllerService im
" \"msg\": \"five\"\n" +
" }\n" +
" }\n" +
- " ]";
+ " ]"
- public void setThrowErrorInSearch(boolean throwErrorInSearch) {
- this.throwErrorInSearch = throwErrorInSearch;
+ void setThrowErrorInSearch(boolean throwErrorInSearch) {
+ this.throwErrorInSearch = throwErrorInSearch
}
- public void setThrowErrorInDelete(boolean throwErrorInDelete) {
- this.throwErrorInDelete = throwErrorInDelete;
+ void setThrowErrorInDelete(boolean throwErrorInDelete) {
+ this.throwErrorInDelete = throwErrorInDelete
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
similarity index 59%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
index 899cedb..fdbdac1 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy
@@ -15,63 +15,57 @@
* limitations under the License.
*/
-package org.apache.nifi.elasticsearch.integration
+package org.apache.nifi.processors.elasticsearch.mock
import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.elasticsearch.DeleteOperationResponse
-import org.apache.nifi.elasticsearch.ElasticSearchClientService
-import org.apache.nifi.elasticsearch.IndexOperationRequest
-import org.apache.nifi.elasticsearch.IndexOperationResponse
-import org.apache.nifi.elasticsearch.SearchResponse
+import org.apache.nifi.elasticsearch.*
-class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
- Map data = [
- "username": "john.smith",
- "password": "testing1234",
- "email": "john.smith@test.com",
- "position": "Software Engineer"
- ]
+class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
+ boolean throwRetriableError
+ boolean throwFatalError
@Override
- IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
+ IndexOperationResponse add(IndexOperationRequest operation) {
return null
}
@Override
- IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException {
+ IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
return null
}
@Override
- DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+ Long count(String query, String index, String type) {
return null
}
@Override
- DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, String id) {
return null
}
@Override
- DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+ DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
return null
}
@Override
- Map<String, Object> get(String index, String type, String id) throws IOException {
- return data
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) {
+ return null
}
@Override
- SearchResponse search(String query, String index, String type) throws IOException {
- List hits = [[
- "_source": data
- ]]
- return new SearchResponse(hits, null, 1, 100, false)
+ Map<String, Object> get(String index, String type, String id) {
+ return null
+ }
+
+ @Override
+ SearchResponse search(String query, String index, String type) {
+ return null
}
@Override
String getTransitUrl(String index, String type) {
- return ""
+ return null
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
new file mode 100644
index 0000000..3c4d902
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.mock
+
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+
+class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
+ IndexOperationResponse response
+ Closure evalClosure
+
+ @Override
+ IndexOperationResponse bulk(List<IndexOperationRequest> items) {
+ if (throwRetriableError) {
+ throw new MockElasticsearchError(true)
+ } else if (throwFatalError) {
+ throw new MockElasticsearchError(false)
+ }
+
+ if (evalClosure) {
+ evalClosure.call(items)
+ }
+
+ response
+ }
+
+ static final SAMPLE_ERROR_RESPONSE = """
+{
+ "took" : 18,
+ "errors" : true,
+ "items" : [
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "1",
+ "_version" : 4,
+ "result" : "updated",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 4,
+ "_primary_term" : 1,
+ "status" : 200
+ }
+ },
+ {
+ "create" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "2",
+ "_version" : 1,
+ "result" : "created",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 1,
+ "_primary_term" : 1,
+ "status" : 201
+ }
+ },
+ {
+ "create" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "3",
+ "_version" : 1,
+ "result" : "created",
+ "_shards" : {
+ "total" : 2,
+ "successful" : 1,
+ "failed" : 0
+ },
+ "_seq_no" : 3,
+ "_primary_term" : 1,
+ "status" : 201
+ }
+ },
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "4",
+ "status" : 400,
+ "error" : {
+ "type" : "mapper_parsing_exception",
+ "reason" : "failed to parse field [field2] of type [integer] in document with id '4'",
+ "caused_by" : {
+ "type" : "number_format_exception",
+ "reason" : "For input string: 20abc"
+ }
+ }
+ }
+ }
+ ]
+}
+ """
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
similarity index 62%
copy from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
copy to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
index a22b7aa..7b1073b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockElasticsearchError.groovy
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -15,22 +15,17 @@
* limitations under the License.
*/
-package org.apache.nifi.elasticsearch;
+package org.apache.nifi.processors.elasticsearch.mock
-public class IndexOperationResponse {
- private long took;
- private long ingestTook;
+import org.apache.nifi.elasticsearch.ElasticsearchError
- public IndexOperationResponse(long took, long ingestTook) {
- this.took = took;
- this.ingestTook = ingestTook;
+class MockElasticsearchError extends ElasticsearchError {
+ MockElasticsearchError(boolean isElastic) {
+ this(new Exception())
+ this.isElastic = isElastic
}
- public long getTook() {
- return took;
- }
-
- public long getIngestTook() {
- return ingestTook;
+ MockElasticsearchError(Exception ex) {
+ super(ex)
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
new file mode 100644
index 0000000..74f8eb7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/.gitignore
@@ -0,0 +1 @@
+# Placeholder to force compilation
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
deleted file mode 100644
index ade1102..0000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearchTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DeleteByQueryElasticsearchTest {
- private static final String INDEX = "test_idx";
- private static final String TYPE = "test_type";
- private static final String QUERY_ATTR = "es.delete.query";
- private static final String CLIENT_NAME = "clientService";
-
- private TestElasticSearchClientService client;
-
- private void initClient(TestRunner runner) throws Exception {
- client = new TestElasticSearchClientService(true);
- runner.addControllerService(CLIENT_NAME, client);
- runner.enableControllerService(client);
- runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME);
- }
-
- private void postTest(TestRunner runner, String queryParam) {
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0);
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1);
-
- List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS);
- String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE);
- String query = flowFiles.get(0).getAttribute(QUERY_ATTR);
- Assert.assertNotNull(attr);
- Assert.assertEquals(attr, "100");
- Assert.assertNotNull(query);
- Assert.assertEquals(queryParam, query);
- }
-
- @Test
- public void testWithFlowfileInput() throws Exception {
- String query = "{ \"query\": { \"match_all\": {} }}";
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- runner.assertValid();
- runner.enqueue(query);
- runner.run();
-
- postTest(runner, query);
- }
-
- @Test
- public void testWithQuery() throws Exception {
- String query = "{\n" +
- "\t\"query\": {\n" +
- "\t\t\"match\": {\n" +
- "\t\t\t\"${field.name}.keyword\": \"test\"\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}";
- Map<String, String> attrs = new HashMap<String, String>(){{
- put("field.name", "test_field");
- }};
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- runner.assertValid();
- runner.enqueue("", attrs);
- runner.run();
-
- postTest(runner, query.replace("${field.name}", "test_field"));
-
- runner.clearTransferState();
-
- query = "{\n" +
- "\t\"query\": {\n" +
- "\t\t\"match\": {\n" +
- "\t\t\t\"test_field.keyword\": \"test\"\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}";
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setIncomingConnection(false);
- runner.assertValid();
- runner.run();
- postTest(runner, query);
- }
-
- @Test
- public void testErrorAttribute() throws Exception {
- String query = "{ \"query\": { \"match_all\": {} }}";
- TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
- runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
- runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
- runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
- initClient(runner);
- client.setThrowErrorInDelete(true);
- runner.assertValid();
- runner.enqueue("");
- runner.run();
-
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0);
- runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1);
-
- MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0);
- String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE);
- Assert.assertNotNull(attr);
- }
-}