You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2023/02/13 18:22:51 UTC
[nifi] branch main updated: NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6542505a50 NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer
6542505a50 is described below
commit 6542505a5071f7bd1152daea5b1763b2af618a74
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Fri Nov 11 15:12:05 2022 +0000
NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #6658.
---
.../elasticsearch/ElasticSearchClientService.java | 116 ++++++-
.../nifi-elasticsearch-client-service/pom.xml | 4 +
.../ElasticSearchClientServiceImpl.java | 334 ++++++++++++++++-----
.../additionalDetails.html | 54 ++++
.../nifi/elasticsearch/SearchResponseTest.java | 24 +-
.../TestElasticSearchClientService.java | 4 +-
.../nifi/elasticsearch/TestSchemaRegistry.java | 6 +-
.../integration/AbstractElasticsearch_IT.java | 11 +
.../integration/ElasticSearchClientService_IT.java | 160 +++++++++-
.../integration/AbstractElasticsearchITBase.java | 57 ++--
nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml | 36 ++-
11 files changed, 656 insertions(+), 150 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 4664ae1a64..7e80f345d9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.elasticsearch;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@@ -32,8 +30,6 @@ import org.apache.nifi.ssl.SSLContextService;
import java.util.List;
import java.util.Map;
-@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService, VerifiableControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
@@ -148,6 +144,118 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.required(true)
.build();
+ PropertyDescriptor COMPRESSION = new PropertyDescriptor.Builder()
+ .name("el-cs-enable-compression")
+ .displayName("Enable Compression")
+ .description("Whether the REST client should compress requests using gzip content encoding and add the " +
+ "\"Accept-Encoding: gzip\" header to receive compressed responses")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SEND_META_HEADER = new PropertyDescriptor.Builder()
+ .name("el-cs-send-meta-header")
+ .displayName("Send Meta Header")
+ .description("Whether to send a \"X-Elastic-Client-Meta\" header that describes the runtime environment. " +
+ "It contains information that is similar to what could be found in User-Agent. " +
+ "Using a separate header allows applications to use User-Agent for their own needs, " +
+ "e.g. to identify application version or other environment information")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ PropertyDescriptor STRICT_DEPRECATION = new PropertyDescriptor.Builder()
+ .name("el-cs-strict-deprecation")
+ .displayName("Strict Deprecation")
+ .description("Whether the REST client should return any response containing at least one warning header as a failure")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ AllowableValue NODE_SELECTOR_ANY = new AllowableValue("ANY", "Any",
+ "Select any Elasticsearch node to handle requests");
+ AllowableValue NODE_SELECTOR_SKIP_DEDICATED_MASTERS = new AllowableValue("SKIP_DEDICATED_MASTERS", "Skip Dedicated Masters",
+ "Skip dedicated Elasticsearch master nodes for handling request");
+
+ PropertyDescriptor NODE_SELECTOR = new PropertyDescriptor.Builder()
+ .name("el-cs-node-selector")
+ .displayName("Node Selector")
+ .description("Selects Elasticsearch nodes that can receive requests. Used to keep requests away from dedicated Elasticsearch master nodes")
+ .allowableValues(NODE_SELECTOR_ANY, NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
+ .defaultValue(NODE_SELECTOR_ANY.getValue())
+ .required(true)
+ .build();
+
+ PropertyDescriptor PATH_PREFIX = new PropertyDescriptor.Builder()
+ .name("el-cs-path-prefix")
+ .displayName("Path Prefix")
+ .description("Sets the path's prefix for every request used by the http client. " +
+ "For example, if this is set to \"/my/path\", then any client request will become \"/my/path/\" + endpoint. " +
+ "In essence, every request's endpoint is prefixed by this pathPrefix. " +
+ "The path prefix is useful for when Elasticsearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; " +
+ "it is not intended for other purposes and it should not be supplied in other scenarios")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ PropertyDescriptor SNIFF_CLUSTER_NODES = new PropertyDescriptor.Builder()
+ .name("el-cs-sniff-cluster-nodes")
+ .displayName("Sniff Cluster Nodes")
+ .description("Periodically sniff for nodes within the Elasticsearch cluster via the Elasticsearch Node Info API. " +
+ "If Elasticsearch security features are enabled (default to \"true\" for 8.x+), the Elasticsearch user must " +
+ "have the \"monitor\" or \"manage\" cluster privilege to use this API." +
+ "Note that all " + HTTP_HOSTS.getDisplayName() + " (and those that may be discovered within the cluster " +
+ "using the Sniffer) must use the same protocol, e.g. http or https, and be contactable using the same client settings. " +
+ "Finally the Elasticsearch \"network.publish_host\" must match one of the \"network.bind_host\" list entries " +
+ "see https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html for more information")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFF_ON_FAILURE = new PropertyDescriptor.Builder()
+ .name("el-cs-sniff-failure")
+ .displayName("Sniff on Failure")
+ .description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
+ "straightaway rather than at the following ordinary sniffing round")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_INTERVAL = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-interval")
+ .displayName("Sniffer Interval")
+ .description("Interval between Cluster sniffer operations")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .defaultValue("5 mins")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_REQUEST_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-request-timeout")
+ .displayName("Sniffer Request Timeout")
+ .description("Cluster sniffer timeout for node info requests")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .defaultValue("1 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_FAILURE_DELAY = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-failure-delay")
+ .displayName("Sniffer Failure Delay")
+ .description("Delay between an Elasticsearch request failure and updating available Cluster nodes using the Sniffer")
+ .dependsOn(SNIFF_ON_FAILURE, "true")
+ .defaultValue("1 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
/**
* Index a document.
*
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 9da490b8af..3cc3f98bef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -132,6 +132,10 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 9658c2ce86..3fceedc8ab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -32,6 +32,9 @@ import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
@@ -40,19 +43,26 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
+import org.elasticsearch.client.sniff.SniffOnFailureListener;
+import org.elasticsearch.client.sniff.Sniffer;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
@@ -73,12 +83,23 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+@Tags({"elasticsearch", "elasticsearch6", "elasticsearch7", "elasticsearch8", "client"})
+@CapabilityDescription("A controller service for accessing an Elasticsearch client. " +
+ "Uses the Elasticsearch REST Client (7.13.4, the last version before client connections verify" +
+ "the server is Elastic provided, this should allow for connections to compatible alternatives, e.g. AWS OpenSearch)")
+@DynamicProperty(
+ name = "The name of a Request Header to add",
+ value = "The value of the Header",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
+ description = "Adds the specified property name/value as a Request Header in the Elasticsearch requests.")
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
public static final String VERIFICATION_STEP_CONNECTION = "Elasticsearch Connection";
public static final String VERIFICATION_STEP_CLIENT_SETUP = "Elasticsearch Rest Client Setup";
public static final String VERIFICATION_STEP_WARNINGS = "Elasticsearch Warnings";
+ public static final String VERIFICATION_STEP_SNIFFER = "Elasticsearch Sniffer";
private ObjectMapper mapper;
@@ -86,6 +107,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private RestClient client;
+ private Sniffer sniffer;
+
private String url;
private Charset responseCharset;
private ObjectWriter prettyPrintWriter;
@@ -93,6 +116,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HTTP_HOSTS);
+ props.add(PATH_PREFIX);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
@@ -104,6 +128,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
props.add(SOCKET_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
+ props.add(COMPRESSION);
+ props.add(SEND_META_HEADER);
+ props.add(STRICT_DEPRECATION);
+ props.add(NODE_SELECTOR);
+ props.add(SNIFF_CLUSTER_NODES);
+ props.add(SNIFFER_INTERVAL);
+ props.add(SNIFFER_REQUEST_TIMEOUT);
+ props.add(SNIFF_ON_FAILURE);
+ props.add(SNIFFER_FAILURE_DELAY);
properties = Collections.unmodifiableList(props);
}
@@ -113,9 +146,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return properties;
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+ final List<ValidationResult> results = new ArrayList<>(1);
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
@@ -126,6 +170,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
final SSLContextService sslService = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
if (authorizationScheme == AuthorizationScheme.PKI && (sslService == null || !sslService.isKeyStoreConfigured())) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
@@ -146,6 +191,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
}
+ final boolean sniffClusterNodes = validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+ final boolean sniffOnFailure = validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
+ if (sniffOnFailure && !sniffClusterNodes) {
+ results.add(new ValidationResult.Builder().subject(SNIFF_ON_FAILURE.getName()).valid(false)
+ .explanation(String.format("'%s' cannot be enabled if '%s' is disabled", SNIFF_ON_FAILURE.getDisplayName(), SNIFF_CLUSTER_NODES.getDisplayName())).build());
+ }
+
return results;
}
@@ -160,6 +212,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
this.client = setupClient(context);
+ this.sniffer = setupSniffer(context, this.client);
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
@@ -178,6 +231,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@OnDisabled
public void onDisabled() throws IOException {
+ if (this.sniffer != null) {
+ this.sniffer.close();
+ this.sniffer = null;
+ }
+
if (this.client != null) {
this.client.close();
this.client = null;
@@ -194,6 +252,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.verificationStepName(VERIFICATION_STEP_CONNECTION);
final ConfigVerificationResult.Builder warningsResult = new ConfigVerificationResult.Builder()
.verificationStepName(VERIFICATION_STEP_WARNINGS);
+ final ConfigVerificationResult.Builder snifferResult = new ConfigVerificationResult.Builder()
+ .verificationStepName(VERIFICATION_STEP_SNIFFER);
// configure the Rest Client
try (final RestClient verifyClient = setupClient(context)) {
@@ -201,7 +261,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
// try to fetch the Elasticsearch root endpoint (system summary)
verifyRootConnection(verifyClient, connectionResult, warningsResult);
- }catch (final MalformedURLException mue) {
+
+ // try sniffing for cluster nodes
+ verifySniffer(context, verifyClient, snifferResult);
+ } catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
} catch (final InitializationException ie) {
@@ -219,21 +282,78 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.explanation("Elasticsearch Rest Client not configured");
warningsResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
.explanation("Elasticsearch Rest Client not configured");
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
+ .explanation("Elasticsearch Rest Client not configured");
}
results.add(clientSetup);
results.add(connectionResult.build());
results.add(warningsResult.build());
+ results.add(snifferResult.build());
}
return results;
}
+ private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+ try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
+ if (verifySniffer != null) {
+ final List<Node> originalNodes = verifyClient.getNodes();
+ // cannot access the NodesSniffer from the parent Sniffer, so set up a second instance here
+ final ElasticsearchNodesSniffer elasticsearchNodesSniffer = setupElasticsearchNodesSniffer(context, verifyClient);
+ final List<Node> nodes = elasticsearchNodesSniffer.sniff();
+
+ // attempt to connect to each Elasticsearch Node using the RestClient
+ final AtomicInteger successfulInstances = new AtomicInteger(0);
+ final AtomicInteger warningInstances = new AtomicInteger(0);
+ nodes.forEach(n -> {
+ try {
+ verifyClient.setNodes(Collections.singletonList(n));
+ final List<String> warnings = getElasticsearchRoot(verifyClient);
+ successfulInstances.getAndIncrement();
+ if (!warnings.isEmpty()) {
+ warningInstances.getAndIncrement();
+ }
+ } catch (final Exception ex) {
+ getLogger().warn("Elasticsearch Node {} connection failed", n.getHost().toURI(), ex);
+ }
+ });
+ // reset Nodes list on RestClient to pre-Sniffer state (match user's Verify settings)
+ verifyClient.setNodes(originalNodes);
+
+ if (successfulInstances.get() < nodes.size()) {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(
+ String.format("Sniffing for Elasticsearch cluster nodes found %d nodes but %d could not be contacted (%d with warnings during connection tests)",
+ nodes.size(), nodes.size() - successfulInstances.get(), warningInstances.get())
+ );
+ } else {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(
+ String.format("Sniffing for Elasticsearch cluster nodes found %d nodes (%d with warnings during connection tests)",
+ nodes.size(), warningInstances.get())
+ );
+ }
+ } else {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Sniff on Connection not enabled");
+ }
+ } catch (final Exception ex) {
+ getLogger().warn("Unable to sniff for Elasticsearch cluster nodes", ex);
+
+ snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation("Sniffing for Elasticsearch cluster nodes failed");
+ }
+ }
+
+ private List<String> getElasticsearchRoot(final RestClient verifyClient) throws IOException {
+ final Response response = verifyClient.performRequest(new Request("GET", "/"));
+ final List<String> warnings = parseResponseWarningHeaders(response);
+ parseResponse(response);
+
+ return warnings;
+ }
+
private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
- final Response response = verifyClient.performRequest(new Request("GET", "/"));
- final List<String> warnings = parseResponseWarningHeaders(response);
- parseResponse(response);
+ final List<String> warnings = getElasticsearchRoot(verifyClient);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@@ -253,80 +373,121 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
private RestClient setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
- final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+ final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
+ final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
+
+ final NodeSelector nodeSelector = NODE_SELECTOR_ANY.getValue().equals(context.getProperty(NODE_SELECTOR).getValue())
+ ? NodeSelector.ANY
+ : NodeSelector.SKIP_DEDICATED_MASTERS;
+ final String pathPrefix = context.getProperty(PATH_PREFIX).getValue();
+
+ final boolean compress = context.getProperty(COMPRESSION).asBoolean();
+ final boolean sendMetaHeader = context.getProperty(SEND_META_HEADER).asBoolean();
+ final boolean strictDeprecation = context.getProperty(STRICT_DEPRECATION).asBoolean();
+ final boolean sniffOnFailure = context.getProperty(SNIFF_ON_FAILURE).asBoolean();
+
+ final RestClientBuilder builder = RestClient.builder(getHttpHosts(context));
+ addAuthAndProxy(context, builder)
+ .setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(connectTimeout);
+ requestConfigBuilder.setSocketTimeout(socketTimeout);
+ return requestConfigBuilder;
+ })
+ .setCompressionEnabled(compress)
+ .setMetaHeaderEnabled(sendMetaHeader)
+ .setStrictDeprecationMode(strictDeprecation)
+ .setNodeSelector(nodeSelector);
+
+ if (sniffOnFailure && sniffer != null) {
+ final SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
+ sniffOnFailureListener.setSniffer(sniffer);
+ builder.setFailureListener(sniffOnFailureListener);
+ }
+
+ if (StringUtils.isNotBlank(pathPrefix)) {
+ builder.setPathPrefix(pathPrefix);
+ }
+ return builder.build();
+ }
+
+ private HttpHost[] getHttpHosts(final ConfigurationContext context) throws MalformedURLException {
final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
- final String[] hostsSplit = hosts.split(",\\s*");
- this.url = hostsSplit[0];
- final SSLContextService sslService =
- context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ final List<String> hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
+ this.url = hostsSplit.get(0);
+ final List<HttpHost> hh = new ArrayList<>(hostsSplit.size());
+ for (final String host : hostsSplit) {
+ final URL u = new URL(host);
+ hh.add(new HttpHost(u.getHost(), u.getPort(), u.getProtocol()));
+ }
+
+ return hh.toArray(new HttpHost[0]);
+ }
+
+ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, final RestClientBuilder builder) throws InitializationException {
+ final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
- final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
- final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
-
+ final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
- final HttpHost[] hh = new HttpHost[hostsSplit.length];
- for (int x = 0; x < hh.length; x++) {
- final URL u = new URL(hostsSplit[x]);
- hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
- }
-
- final SSLContext sslContext;
- try {
- sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
- ? sslService.createContext() : null;
- } catch (final Exception e) {
- getLogger().error("Error building up SSL Context from the supplied configuration.", e);
- throw new InitializationException(e);
- }
+ return builder.setHttpClientConfigCallback(httpClientBuilder -> {
+ if (sslContext != null) {
+ httpClientBuilder.setSSLContext(sslContext);
+ }
- final RestClientBuilder builder = RestClient.builder(hh)
- .setHttpClientConfigCallback(httpClientBuilder -> {
- if (sslContext != null) {
- httpClientBuilder.setSSLContext(sslContext);
- }
+ CredentialsProvider credentialsProvider = null;
+ if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
+ credentialsProvider = addBasicAuthCredentials(null, AuthScope.ANY, username, password);
+ }
- CredentialsProvider credentialsProvider = null;
- if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
- credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
- }
+ final List<Header> defaultHeaders = getDefaultHeadersFromDynamicProperties(context);
+ if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
+ defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
+ }
+ if (!defaultHeaders.isEmpty()) {
+ builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
+ }
- if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
- httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
- }
+ if (proxyConfigurationService != null) {
+ final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
+ if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
+ final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
+ httpClientBuilder.setProxy(proxy);
- if (proxyConfigurationService != null) {
- final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
- if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
- final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
- httpClientBuilder.setProxy(proxy);
+ credentialsProvider = addBasicAuthCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
+ }
+ }
- credentialsProvider = addCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
- }
- }
+ if (credentialsProvider != null) {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
- if (credentialsProvider != null) {
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
+ return httpClientBuilder;
+ });
+ }
- return httpClientBuilder;
- })
- .setRequestConfigCallback(requestConfigBuilder -> {
- requestConfigBuilder.setConnectTimeout(connectTimeout);
- requestConfigBuilder.setSocketTimeout(socketTimeout);
- return requestConfigBuilder;
- });
+ private SSLContext getSSLContext(final ConfigurationContext context) throws InitializationException {
+ final SSLContextService sslService =
+ context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- return builder.build();
+ try {
+ return (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
+ ? sslService.createContext() : null;
+ } catch (final Exception e) {
+ getLogger().error("Error building up SSL Context from the supplied configuration.", e);
+ throw new InitializationException(e);
+ }
}
- private CredentialsProvider addCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {
+ private CredentialsProvider addBasicAuthCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope,
+ final String username, final String password) {
final CredentialsProvider cp = credentialsProvider != null ? credentialsProvider : new BasicCredentialsProvider();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
@@ -339,12 +500,48 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return cp;
}
- private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
+ private List<Header> getDefaultHeadersFromDynamicProperties(final ConfigurationContext context) {
+ return context.getProperties().entrySet().stream()
+ // filter non-null dynamic properties
+ .filter(e -> e.getKey().isDynamic() && StringUtils.isNotBlank(e.getValue())
+ && StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue())
+ )
+ // convert to Headers
+ .map(e -> new BasicHeader(e.getKey().getName(),
+ context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final String apiKey) {
final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
+ private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
+ final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+ final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int snifferFailureDelayMillis = context.getProperty(SNIFFER_FAILURE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ if (sniffClusterNodes) {
+ return Sniffer.builder(restClient)
+ .setSniffIntervalMillis(snifferIntervalMillis)
+ .setSniffAfterFailureDelayMillis(snifferFailureDelayMillis)
+ .setNodesSniffer(setupElasticsearchNodesSniffer(context, restClient))
+ .build();
+ }
+
+ return null;
+ }
+
+ private ElasticsearchNodesSniffer setupElasticsearchNodesSniffer(final ConfigurationContext context, final RestClient restClient) {
+ final Long snifferRequestTimeoutMillis = context.getProperty(SNIFFER_REQUEST_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final ElasticsearchNodesSniffer.Scheme scheme = this.url.toLowerCase(Locale.getDefault()).startsWith("https://")
+ ? ElasticsearchNodesSniffer.Scheme.HTTPS : ElasticsearchNodesSniffer.Scheme.HTTP;
+
+ return new ElasticsearchNodesSniffer(restClient, snifferRequestTimeoutMillis, scheme);
+ }
+
private void appendIndex(final StringBuilder sb, final String index) {
if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
if (!index.startsWith("/")) {
@@ -798,10 +995,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
if (getLogger().isDebugEnabled()) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- entity.writeTo(out);
- out.close();
-
StringBuilder builder = new StringBuilder(1000);
builder.append("Dumping Elasticsearch REST request...\n")
.append("HTTP Method: ")
@@ -812,11 +1005,18 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.append("\n")
.append("Parameters: ")
.append(prettyPrintWriter.writeValueAsString(parameters))
- .append("\n")
- .append("Request body: ")
- .append(new String(out.toByteArray()))
.append("\n");
+ if (entity != null) {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ entity.writeTo(out);
+ out.close();
+
+ builder.append("Request body: ")
+ .append(out)
+ .append("\n");
+ }
+
getLogger().debug(builder.toString());
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
new file mode 100644
index 0000000000..f0934cafe6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
@@ -0,0 +1,54 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>ElasticSearchClientServiceImpl</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<h2>Sniffing</h2>
+<p>
+ The Elasticsearch Sniffer can be used to locate Elasticsearch Nodes within a Cluster to which you are connecting.
+ This can be beneficial if your cluster dynamically changes over time, e.g. new Nodes are added to maintain performance during heavy load.
+</p>
+<p>
+ Sniffing can also be used to update the list of Hosts within the Cluster if a connection Failure is encountered during operation.
+ In order to "Sniff on Failure", you <b>must</b> also enable "Sniff Cluster Nodes".
+</p>
+<p>
+ Not all situations make sense to use Sniffing, for example if:
+ <ul>
+ <li>Elasticsearch is situated behind a load balancer, which dynamically routes connections from NiFi</li>
+ <li>Elasticsearch is on a different network to NiFi</li>
+ </ul>
+</p>
+<p>
+ There may also be need to set some of the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html">
+ Elasticsearch Networking Advanced Settings</a>, such as <code>network.publish_host</code> to ensure that
+ the HTTP Hosts found by the Sniffer are accessible by NiFi. For example, Elasticsearch may use a network internal
+ <code>publish_host</code> that is inaccessible to NiFi, but instead should use an address/IP that NiFi understands.
+ It may also be necessary to add this same address to Elasticsearch's <code>network.bind_host</code> list.
+</p>
+<p>
+ See <a href="https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how">
+ Elasticsearch sniffing best practices: What, when, why, how</a> for more details of the best practices.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
index 9f99ec0da8..bf82c8853d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
@@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,17 +31,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SearchResponseTest {
@Test
void test() {
- List<Map<String, Object>> results = new ArrayList<>();
- Map<String, Object> aggs = new HashMap<>();
- String pitId = "pitId";
- String scrollId = "scrollId";
- String searchAfter = "searchAfter";
- int num = 10;
- int took = 100;
- boolean timeout = false;
- List<String> warnings = Arrays.asList("auth");
- SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
- String str = response.toString();
+ final List<Map<String, Object>> results = new ArrayList<>();
+ final Map<String, Object> aggs = new HashMap<>();
+ final String pitId = "pitId";
+ final String scrollId = "scrollId";
+ final String searchAfter = "searchAfter";
+ final int num = 10;
+ final int took = 100;
+ final boolean timeout = false;
+ final List<String> warnings = Collections.singletonList("auth");
+ final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
+ final String str = response.toString();
assertEquals(results, response.getHits());
assertEquals(aggs, response.getAggregations());
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
index 4ee196a9dc..c85879179f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
@@ -28,10 +28,10 @@ import java.util.List;
import java.util.Map;
public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
- private Map<String, Object> data;
+ private final Map<String, Object> data;
public TestElasticSearchClientService() {
- data = new HashMap<>();
+ data = new HashMap<>(4, 1);
data.put("username", "john.smith");
data.put("password", "testing1234");
data.put("email", "john.smith@test.com");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
index 813ce1c5d2..7868c5ccd5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
@@ -26,7 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -36,12 +36,12 @@ import static org.apache.nifi.schema.access.SchemaField.SCHEMA_NAME;
public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
@Override
public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
- List<RecordField> fields = Arrays.asList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+ List<RecordField> fields = Collections.singletonList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields);
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
- return new HashSet<>(Arrays.asList(SCHEMA_NAME));
+ return new HashSet<>(Collections.singletonList(SCHEMA_NAME));
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 104e504b23..c9dfc752e6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -38,6 +38,8 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new ElasticSearchClientServiceImpl();
runner.addControllerService(CLIENT_SERVICE_NAME, service);
+ runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, CLIENT_SERVICE_NAME);
+
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, elasticsearchHost);
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
@@ -45,6 +47,15 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
+ runner.removeProperty(service, ElasticSearchClientService.API_KEY);
+ runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
+ runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "false");
+ runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "true");
+ runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "false");
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+ runner.removeProperty(service, ElasticSearchClientService.PATH_PREFIX);
+ runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_ANY.getValue());
runner.enableControllerService(service);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index d7f0d39874..402363aece 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -18,7 +18,12 @@
package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.VerifiableControllerService;
@@ -31,7 +36,6 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
import org.apache.nifi.elasticsearch.UpdateOperationResponse;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@@ -39,11 +43,15 @@ import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceLookup;
import org.apache.nifi.util.MockVariableRegistry;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -82,13 +90,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
+ assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+ assertVerifySnifferSkipped(results);
+ }
+
+ @Test
+ void testVerifySniffer() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+ runner.enableControllerService(service);
+ assertVerifySniffer();
+
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+ runner.enableControllerService(service);
+ assertVerifySniffer();
+ }
+
+ private void assertVerifySniffer() {
+ final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+ new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+ runner.getLogger(),
+ Collections.emptyMap()
+ );
+ assertEquals(4, results.size());
+ assertEquals(4, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
}
@Test
void testVerifySuccessWithApiKeyAuth() throws IOException {
- final Pair<String, String> apiKey = createApiKeyForIndex("*");
+ final Pair<String, String> apiKey = createApiKeyForIndex();
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
@@ -103,8 +136,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
+ assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+ assertVerifySnifferSkipped(results);
}
@Test
@@ -117,8 +151,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
- assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+ assertEquals(4, results.size());
+ assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
&& Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName())
@@ -146,8 +180,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
- assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+ assertEquals(4, results.size());
+ assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
&& Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName())
@@ -167,9 +201,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
+ assertEquals(4, results.size());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
- assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+ assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
&& Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@@ -193,9 +227,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
- assertEquals(3, results.size());
+ assertEquals(4, results.size());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
- assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+ assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
&& Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@@ -623,6 +657,71 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
assertFalse(service.exists("index-does-not-exist", null), "index exists");
}
+ @Test
+ void testCompression() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "true");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
+ @Test
+ void testNoMetaHeader() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "false");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
+ @Test
+ void testStrictDeprecation() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "true");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
+ @Test
+ void testNodeSelector() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS.getValue());
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
+ @Test
+ void testRestClientRequestHeaders() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, "User-Agent", "NiFi Integration Tests");
+ runner.setProperty(service, "X-Extra_header", "Request should still work");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
+ @Test
+ void testSniffer() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+ runner.assertNotValid(service);
+
+ runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertTrue(service.exists(INDEX, null), "index does not exist");
+ }
+
@Test
void testNullSuppression() throws InterruptedException {
final Map<String, Object> doc = new HashMap<>();
@@ -659,13 +758,12 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
}
private void suppressNulls(final boolean suppressNulls) {
- runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service");
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls
? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()
: ElasticSearchClientService.NEVER_SUPPRESS.getValue());
runner.enableControllerService(service);
- runner.assertValid();
+ runner.assertValid(service);
}
@Test
@@ -823,4 +921,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
Thread.sleep(1000);
}
+ private void assertVerifySnifferSkipped(final List<ConfigVerificationResult> results) {
+ assertEquals(1, results.stream().filter(
+ result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_SNIFFER)
+ && Objects.equals(result.getExplanation(), "Sniff on Connection not enabled")
+ && result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(),
+ results.toString()
+ );
+ }
+
+ protected Pair<String, String> createApiKeyForIndex() throws IOException {
+ final String body = prettyJson(new MapBuilder()
+ .of("name", "test-api-key")
+ .of("role_descriptors", new MapBuilder()
+ .of("test-role", new MapBuilder()
+ .of("cluster", Collections.singletonList("all"))
+ .of("index", Collections.singletonList(new MapBuilder()
+ .of("names", Collections.singletonList("*"))
+ .of("privileges", Collections.singletonList("all"))
+ .build()))
+ .build())
+ .build())
+ .build());
+ final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
+ final Request request = new Request("POST", endpoint);
+ final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
+ request.setEntity(jsonBody);
+
+ final Response response = testDataManagementClient.performRequest(request);
+ final InputStream inputStream = response.getEntity().getContent();
+ final byte[] result = IOUtils.toByteArray(inputStream);
+ inputStream.close();
+ final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() {});
+ return Pair.of(ret.get("id"), ret.get("api_key"));
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 7df9de2f07..c839a3f090 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -18,18 +18,18 @@ package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.HostConfig;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
-import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.util.TestRunner;
import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.BeforeAll;
@@ -39,30 +39,38 @@ import org.testcontainers.utility.DockerImageName;
import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
+ .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.6.1"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
+ private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
.withPassword(ELASTIC_USER_PASSWORD)
.withEnv("xpack.security.enabled", "true")
// enable API Keys for integration-tests (6.x & 7.x don't enable SSL and therefore API Keys by default, so use a trial license and explicitly enable API Keys)
.withEnv("xpack.license.self_generated.type", "trial")
- .withEnv("xpack.security.authc.api_key.enabled", "true");
+ .withEnv("xpack.security.authc.api_key.enabled", "true")
+ // use a "special address" to ensure the publish_host is in the bind_host list, otherwise the Sniffer won't work
+ // https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#network-interface-values
+ // TestContainers makes Elasticsearch available via localhost/127.0.0.1; Elasticsearch uses the IP Address in publish_host
+ .withEnv("network.bind_host", "_local_,_site_")
+ .withEnv("network.publish_host", "127.0.0.1")
+ // pin the Elasticsearch port (typically 9200 but not guaranteed), also bind that to 9200 on the host so the network.publish_host is accessible
+ .withEnv("http.port", String.valueOf(PORT))
+ .withExposedPorts(PORT)
+ .withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(
+ new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(PORT), new ExposedPort(PORT)))
+ ));
protected static final String CLIENT_SERVICE_NAME = "Client Service";
protected static final String INDEX = "messages";
@@ -88,7 +96,7 @@ public abstract class AbstractElasticsearchITBase {
protected static String type;
- private static RestClient testDataManagementClient;
+ static RestClient testDataManagementClient;
protected static void stopTestcontainer() {
if (ENABLE_TEST_CONTAINERS) {
@@ -98,7 +106,6 @@ public abstract class AbstractElasticsearchITBase {
@BeforeAll
static void beforeAll() throws IOException {
-
startTestcontainer();
type = getElasticMajorVersion() == 6 ? "_doc" : "";
System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: %s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
@@ -173,32 +180,6 @@ public abstract class AbstractElasticsearchITBase {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
}
- protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
- final String body = prettyJson(new MapBuilder()
- .of("name", "test-api-key")
- .of("role_descriptors", new MapBuilder()
- .of("test-role", new MapBuilder()
- .of("cluster", Collections.singletonList("all"))
- .of("index", Collections.singletonList(new MapBuilder()
- .of("names", Collections.singletonList(index))
- .of("privileges", Collections.singletonList("all"))
- .build()))
- .build())
- .build())
- .build());
- final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
- final Request request = new Request("POST", endpoint);
- final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
- request.setEntity(jsonBody);
-
- final Response response = testDataManagementClient.performRequest(request);
- final InputStream inputStream = response.getEntity().getContent();
- final byte[] result = IOUtils.toByteArray(inputStream);
- inputStream.close();
- final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
- return Pair.of(ret.get("id"), ret.get("api_key"));
- }
-
private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
final List<SetupAction> actions = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 128260533f..f906a9a4cd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -31,6 +31,17 @@ language governing permissions and limitations under the License. -->
<module>nifi-elasticsearch-restapi-processors</module>
</modules>
+ <properties>
+ <!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
+ to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
+ see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
+
+ Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
+ (https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
+ of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
+ <elasticsearch.client.version>7.13.4</elasticsearch.client.version>
+ </properties>
+
<dependencyManagement>
<dependencies>
<dependency>
@@ -53,14 +64,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
- <!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
- to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
- see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
-
- Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
- (https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
- of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
- <version>7.13.4</version>
+ <version>${elasticsearch.client.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -73,6 +77,18 @@ language governing permissions and limitations under the License. -->
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+ <version>${elasticsearch.client.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -85,7 +101,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
- <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.6.1</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@@ -116,7 +132,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
- <elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
+ <elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
</properties>
</profile>
</profiles>