You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2023/02/13 18:22:51 UTC

[nifi] branch main updated: NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6542505a50 NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer
6542505a50 is described below

commit 6542505a5071f7bd1152daea5b1763b2af618a74
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Fri Nov 11 15:12:05 2022 +0000

    NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #6658.
---
 .../elasticsearch/ElasticSearchClientService.java  | 116 ++++++-
 .../nifi-elasticsearch-client-service/pom.xml      |   4 +
 .../ElasticSearchClientServiceImpl.java            | 334 ++++++++++++++++-----
 .../additionalDetails.html                         |  54 ++++
 .../nifi/elasticsearch/SearchResponseTest.java     |  24 +-
 .../TestElasticSearchClientService.java            |   4 +-
 .../nifi/elasticsearch/TestSchemaRegistry.java     |   6 +-
 .../integration/AbstractElasticsearch_IT.java      |  11 +
 .../integration/ElasticSearchClientService_IT.java | 160 +++++++++-
 .../integration/AbstractElasticsearchITBase.java   |  57 ++--
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |  36 ++-
 11 files changed, 656 insertions(+), 150 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 4664ae1a64..7e80f345d9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.elasticsearch;
 
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -32,8 +30,6 @@ import org.apache.nifi.ssl.SSLContextService;
 import java.util.List;
 import java.util.Map;
 
-@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
 public interface ElasticSearchClientService extends ControllerService, VerifiableControllerService {
     PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
             .name("el-cs-http-hosts")
@@ -148,6 +144,118 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
             .required(true)
             .build();
 
+    PropertyDescriptor COMPRESSION = new PropertyDescriptor.Builder()
+            .name("el-cs-enable-compression")
+            .displayName("Enable Compression")
+            .description("Whether the REST client should compress requests using gzip content encoding and add the " +
+                    "\"Accept-Encoding: gzip\" header to receive compressed responses")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SEND_META_HEADER = new PropertyDescriptor.Builder()
+            .name("el-cs-send-meta-header")
+            .displayName("Send Meta Header")
+            .description("Whether to send a \"X-Elastic-Client-Meta\" header that describes the runtime environment. " +
+                    "It contains information that is similar to what could be found in User-Agent. " +
+                    "Using a separate header allows applications to use User-Agent for their own needs, " +
+                    "e.g. to identify application version or other environment information")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    PropertyDescriptor STRICT_DEPRECATION = new PropertyDescriptor.Builder()
+            .name("el-cs-strict-deprecation")
+            .displayName("Strict Deprecation")
+            .description("Whether the REST client should return any response containing at least one warning header as a failure")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    AllowableValue NODE_SELECTOR_ANY = new AllowableValue("ANY", "Any",
+            "Select any Elasticsearch node to handle requests");
+    AllowableValue NODE_SELECTOR_SKIP_DEDICATED_MASTERS = new AllowableValue("SKIP_DEDICATED_MASTERS", "Skip Dedicated Masters",
+            "Skip dedicated Elasticsearch master nodes for handling request");
+
+    PropertyDescriptor NODE_SELECTOR = new PropertyDescriptor.Builder()
+            .name("el-cs-node-selector")
+            .displayName("Node Selector")
+            .description("Selects Elasticsearch nodes that can receive requests. Used to keep requests away from dedicated Elasticsearch master nodes")
+            .allowableValues(NODE_SELECTOR_ANY, NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
+            .defaultValue(NODE_SELECTOR_ANY.getValue())
+            .required(true)
+            .build();
+
+    PropertyDescriptor PATH_PREFIX = new PropertyDescriptor.Builder()
+            .name("el-cs-path-prefix")
+            .displayName("Path Prefix")
+            .description("Sets the path's prefix for every request used by the http client. " +
+                    "For example, if this is set to \"/my/path\", then any client request will become \"/my/path/\" + endpoint. " +
+                    "In essence, every request's endpoint is prefixed by this pathPrefix. " +
+                    "The path prefix is useful for when Elasticsearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; " +
+                    "it is not intended for other purposes and it should not be supplied in other scenarios")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    PropertyDescriptor SNIFF_CLUSTER_NODES = new PropertyDescriptor.Builder()
+            .name("el-cs-sniff-cluster-nodes")
+            .displayName("Sniff Cluster Nodes")
+            .description("Periodically sniff for nodes within the Elasticsearch cluster via the Elasticsearch Node Info API. " +
+                    "If Elasticsearch security features are enabled (default to \"true\" for 8.x+), the Elasticsearch user must " +
+                    "have the \"monitor\" or \"manage\" cluster privilege to use this API." +
+                    "Note that all " + HTTP_HOSTS.getDisplayName() + " (and those that may be discovered within the cluster " +
+                    "using the Sniffer) must use the same protocol, e.g. http or https, and be contactable using the same client settings. " +
+                    "Finally the Elasticsearch \"network.publish_host\" must match one of the \"network.bind_host\" list entries " +
+                    "see https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html for more information")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFF_ON_FAILURE = new PropertyDescriptor.Builder()
+            .name("el-cs-sniff-failure")
+            .displayName("Sniff on Failure")
+            .description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
+                    "straightaway rather than at the following ordinary sniffing round")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_INTERVAL = new PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-interval")
+            .displayName("Sniffer Interval")
+            .description("Interval between Cluster sniffer operations")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .defaultValue("5 mins")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_REQUEST_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-request-timeout")
+            .displayName("Sniffer Request Timeout")
+            .description("Cluster sniffer timeout for node info requests")
+            .dependsOn(SNIFF_CLUSTER_NODES, "true")
+            .defaultValue("1 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
+    PropertyDescriptor SNIFFER_FAILURE_DELAY = new PropertyDescriptor.Builder()
+            .name("el-cs-sniffer-failure-delay")
+            .displayName("Sniffer Failure Delay")
+            .description("Delay between an Elasticsearch request failure and updating available Cluster nodes using the Sniffer")
+            .dependsOn(SNIFF_ON_FAILURE, "true")
+            .defaultValue("1 min")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .build();
+
     /**
      * Index a document.
      *
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 9da490b8af..3cc3f98bef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -132,6 +132,10 @@
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 9658c2ce86..3fceedc8ab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -32,6 +32,9 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.ConfigVerificationResult;
@@ -40,19 +43,26 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.NodeSelector;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
+import org.elasticsearch.client.sniff.SniffOnFailureListener;
+import org.elasticsearch.client.sniff.Sniffer;
 
 import javax.net.ssl.SSLContext;
 import java.io.ByteArrayOutputStream;
@@ -73,12 +83,23 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+@Tags({"elasticsearch", "elasticsearch6", "elasticsearch7", "elasticsearch8", "client"})
+@CapabilityDescription("A controller service for accessing an Elasticsearch client. " +
+        "Uses the Elasticsearch REST Client (7.13.4, the last version before client connections verify" +
+        "the server is Elastic provided, this should allow for connections to compatible alternatives, e.g. AWS OpenSearch)")
+@DynamicProperty(
+        name = "The name of a Request Header to add",
+        value = "The value of the Header",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
+        description = "Adds the specified property name/value as a Request Header in the Elasticsearch requests.")
 public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
     public static final String VERIFICATION_STEP_CONNECTION = "Elasticsearch Connection";
     public static final String VERIFICATION_STEP_CLIENT_SETUP = "Elasticsearch Rest Client Setup";
     public static final String VERIFICATION_STEP_WARNINGS = "Elasticsearch Warnings";
+    public static final String VERIFICATION_STEP_SNIFFER = "Elasticsearch Sniffer";
 
     private ObjectMapper mapper;
 
@@ -86,6 +107,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     private RestClient client;
 
+    private Sniffer sniffer;
+
     private String url;
     private Charset responseCharset;
     private ObjectWriter prettyPrintWriter;
@@ -93,6 +116,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HTTP_HOSTS);
+        props.add(PATH_PREFIX);
         props.add(AUTHORIZATION_SCHEME);
         props.add(USERNAME);
         props.add(PASSWORD);
@@ -104,6 +128,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         props.add(SOCKET_TIMEOUT);
         props.add(CHARSET);
         props.add(SUPPRESS_NULLS);
+        props.add(COMPRESSION);
+        props.add(SEND_META_HEADER);
+        props.add(STRICT_DEPRECATION);
+        props.add(NODE_SELECTOR);
+        props.add(SNIFF_CLUSTER_NODES);
+        props.add(SNIFFER_INTERVAL);
+        props.add(SNIFFER_REQUEST_TIMEOUT);
+        props.add(SNIFF_ON_FAILURE);
+        props.add(SNIFFER_FAILURE_DELAY);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -113,9 +146,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         return properties;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(1);
 
         final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
 
@@ -126,6 +170,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
 
         final SSLContextService sslService = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
         if (authorizationScheme == AuthorizationScheme.PKI && (sslService == null || !sslService.isKeyStoreConfigured())) {
             results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
                     .explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
@@ -146,6 +191,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
             addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
         }
 
+        final boolean sniffClusterNodes = validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+        final boolean sniffOnFailure = validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
+        if (sniffOnFailure && !sniffClusterNodes) {
+            results.add(new ValidationResult.Builder().subject(SNIFF_ON_FAILURE.getName()).valid(false)
+                    .explanation(String.format("'%s' cannot be enabled if '%s' is disabled", SNIFF_ON_FAILURE.getDisplayName(), SNIFF_CLUSTER_NODES.getDisplayName())).build());
+        }
+
         return results;
     }
 
@@ -160,6 +212,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     public void onEnabled(final ConfigurationContext context) throws InitializationException {
         try {
             this.client = setupClient(context);
+            this.sniffer = setupSniffer(context, this.client);
             responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
 
             // re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
@@ -178,6 +231,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     @OnDisabled
     public void onDisabled() throws IOException {
+        if (this.sniffer != null) {
+            this.sniffer.close();
+            this.sniffer = null;
+        }
+
         if (this.client != null) {
             this.client.close();
             this.client = null;
@@ -194,6 +252,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                 .verificationStepName(VERIFICATION_STEP_CONNECTION);
         final ConfigVerificationResult.Builder warningsResult = new ConfigVerificationResult.Builder()
                 .verificationStepName(VERIFICATION_STEP_WARNINGS);
+        final ConfigVerificationResult.Builder snifferResult = new ConfigVerificationResult.Builder()
+                .verificationStepName(VERIFICATION_STEP_SNIFFER);
 
         // configure the Rest Client
         try (final RestClient verifyClient = setupClient(context)) {
@@ -201,7 +261,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
             // try to fetch the Elasticsearch root endpoint (system summary)
             verifyRootConnection(verifyClient, connectionResult, warningsResult);
-        }catch (final MalformedURLException mue) {
+
+            // try sniffing for cluster nodes
+            verifySniffer(context, verifyClient, snifferResult);
+        } catch (final MalformedURLException mue) {
             clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
                     .explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
         } catch (final InitializationException ie) {
@@ -219,21 +282,78 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                         .explanation("Elasticsearch Rest Client not configured");
                 warningsResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
                         .explanation("Elasticsearch Rest Client not configured");
+                snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
+                        .explanation("Elasticsearch Rest Client not configured");
             }
 
             results.add(clientSetup);
             results.add(connectionResult.build());
             results.add(warningsResult.build());
+            results.add(snifferResult.build());
         }
 
         return results;
     }
 
+    private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+        try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
+            if (verifySniffer != null) {
+                final List<Node> originalNodes = verifyClient.getNodes();
+                // cannot access the NodesSniffer from the parent Sniffer, so set up a second instance here
+                final ElasticsearchNodesSniffer elasticsearchNodesSniffer = setupElasticsearchNodesSniffer(context, verifyClient);
+                final List<Node> nodes = elasticsearchNodesSniffer.sniff();
+
+                // attempt to connect to each Elasticsearch Node using the RestClient
+                final AtomicInteger successfulInstances = new AtomicInteger(0);
+                final AtomicInteger warningInstances = new AtomicInteger(0);
+                nodes.forEach(n -> {
+                    try {
+                        verifyClient.setNodes(Collections.singletonList(n));
+                        final List<String> warnings = getElasticsearchRoot(verifyClient);
+                        successfulInstances.getAndIncrement();
+                        if (!warnings.isEmpty()) {
+                            warningInstances.getAndIncrement();
+                        }
+                    } catch (final Exception ex) {
+                        getLogger().warn("Elasticsearch Node {} connection failed", n.getHost().toURI(), ex);
+                    }
+                });
+                // reset Nodes list on RestClient to pre-Sniffer state (match user's Verify settings)
+                verifyClient.setNodes(originalNodes);
+
+                if (successfulInstances.get() < nodes.size()) {
+                    snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(
+                            String.format("Sniffing for Elasticsearch cluster nodes found %d nodes but %d could not be contacted (%d with warnings during connection tests)",
+                                    nodes.size(), nodes.size() - successfulInstances.get(), warningInstances.get())
+                    );
+                } else {
+                    snifferResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(
+                            String.format("Sniffing for Elasticsearch cluster nodes found %d nodes (%d with warnings during connection tests)",
+                                    nodes.size(), warningInstances.get())
+                    );
+                }
+            } else {
+                snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Sniff on Connection not enabled");
+            }
+        } catch (final Exception ex) {
+            getLogger().warn("Unable to sniff for Elasticsearch cluster nodes", ex);
+
+            snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+                    .explanation("Sniffing for Elasticsearch cluster nodes failed");
+        }
+    }
+
+    private List<String> getElasticsearchRoot(final RestClient verifyClient) throws IOException {
+        final Response response = verifyClient.performRequest(new Request("GET", "/"));
+        final List<String> warnings = parseResponseWarningHeaders(response);
+        parseResponse(response);
+
+        return warnings;
+    }
+
     private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
         try {
-            final Response response = verifyClient.performRequest(new Request("GET", "/"));
-            final List<String> warnings = parseResponseWarningHeaders(response);
-            parseResponse(response);
+            final List<String> warnings = getElasticsearchRoot(verifyClient);
 
             connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
             if (warnings.isEmpty()) {
@@ -253,80 +373,121 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
     }
 
     private RestClient setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
-        final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+        final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
+        final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
+
+        final NodeSelector nodeSelector = NODE_SELECTOR_ANY.getValue().equals(context.getProperty(NODE_SELECTOR).getValue())
+                ? NodeSelector.ANY
+                : NodeSelector.SKIP_DEDICATED_MASTERS;
+        final String pathPrefix = context.getProperty(PATH_PREFIX).getValue();
+
+        final boolean compress = context.getProperty(COMPRESSION).asBoolean();
+        final boolean sendMetaHeader = context.getProperty(SEND_META_HEADER).asBoolean();
+        final boolean strictDeprecation = context.getProperty(STRICT_DEPRECATION).asBoolean();
+        final boolean sniffOnFailure = context.getProperty(SNIFF_ON_FAILURE).asBoolean();
+
+        final RestClientBuilder builder = RestClient.builder(getHttpHosts(context));
+        addAuthAndProxy(context, builder)
+                .setRequestConfigCallback(requestConfigBuilder -> {
+                    requestConfigBuilder.setConnectTimeout(connectTimeout);
+                    requestConfigBuilder.setSocketTimeout(socketTimeout);
+                    return requestConfigBuilder;
+                })
+                .setCompressionEnabled(compress)
+                .setMetaHeaderEnabled(sendMetaHeader)
+                .setStrictDeprecationMode(strictDeprecation)
+                .setNodeSelector(nodeSelector);
+
+        if (sniffOnFailure && sniffer != null) {
+            final SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
+            sniffOnFailureListener.setSniffer(sniffer);
+            builder.setFailureListener(sniffOnFailureListener);
+        }
+
+        if (StringUtils.isNotBlank(pathPrefix)) {
+            builder.setPathPrefix(pathPrefix);
+        }
 
+        return builder.build();
+    }
+
+    private HttpHost[] getHttpHosts(final ConfigurationContext context) throws MalformedURLException {
         final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
-        final String[] hostsSplit = hosts.split(",\\s*");
-        this.url = hostsSplit[0];
-        final SSLContextService sslService =
-                context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final List<String> hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
+        this.url = hostsSplit.get(0);
+        final List<HttpHost> hh = new ArrayList<>(hostsSplit.size());
+        for (final String host : hostsSplit) {
+            final URL u = new URL(host);
+            hh.add(new HttpHost(u.getHost(), u.getPort(), u.getProtocol()));
+        }
+
+        return hh.toArray(new HttpHost[0]);
+    }
+
+    private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, final RestClientBuilder builder) throws InitializationException {
+        final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
+
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
         final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
         final String apiKey = context.getProperty(API_KEY).getValue();
 
-        final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
-        final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
-
+        final SSLContext sslContext = getSSLContext(context);
         final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
 
-        final HttpHost[] hh = new HttpHost[hostsSplit.length];
-        for (int x = 0; x < hh.length; x++) {
-            final URL u = new URL(hostsSplit[x]);
-            hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
-        }
-
-        final SSLContext sslContext;
-        try {
-            sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
-                    ? sslService.createContext() : null;
-        } catch (final Exception e) {
-            getLogger().error("Error building up SSL Context from the supplied configuration.", e);
-            throw new InitializationException(e);
-        }
+        return builder.setHttpClientConfigCallback(httpClientBuilder -> {
+            if (sslContext != null) {
+                httpClientBuilder.setSSLContext(sslContext);
+            }
 
-        final RestClientBuilder builder = RestClient.builder(hh)
-                .setHttpClientConfigCallback(httpClientBuilder -> {
-                    if (sslContext != null) {
-                        httpClientBuilder.setSSLContext(sslContext);
-                    }
+            CredentialsProvider credentialsProvider = null;
+            if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
+                credentialsProvider = addBasicAuthCredentials(null, AuthScope.ANY, username, password);
+            }
 
-                    CredentialsProvider credentialsProvider = null;
-                    if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
-                        credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
-                    }
+            final List<Header> defaultHeaders = getDefaultHeadersFromDynamicProperties(context);
+            if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
+                defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
+            }
+            if (!defaultHeaders.isEmpty()) {
+                builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
+            }
 
-                    if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
-                        httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
-                    }
+            if (proxyConfigurationService != null) {
+                final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
+                if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
+                    final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
+                    httpClientBuilder.setProxy(proxy);
 
-                    if (proxyConfigurationService != null) {
-                        final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
-                        if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
-                            final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
-                            httpClientBuilder.setProxy(proxy);
+                    credentialsProvider = addBasicAuthCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
+                }
+            }
 
-                            credentialsProvider = addCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
-                        }
-                    }
+            if (credentialsProvider != null) {
+                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+            }
 
-                    if (credentialsProvider != null) {
-                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
+            return httpClientBuilder;
+        });
+    }
 
-                    return httpClientBuilder;
-                })
-                .setRequestConfigCallback(requestConfigBuilder -> {
-                    requestConfigBuilder.setConnectTimeout(connectTimeout);
-                    requestConfigBuilder.setSocketTimeout(socketTimeout);
-                    return requestConfigBuilder;
-                });
+    private SSLContext getSSLContext(final ConfigurationContext context) throws InitializationException {
+        final SSLContextService sslService =
+                context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
-        return builder.build();
+        try {
+            return (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
+                    ? sslService.createContext() : null;
+        } catch (final Exception e) {
+            getLogger().error("Error building up SSL Context from the supplied configuration.", e);
+            throw new InitializationException(e);
+        }
     }
 
-    private CredentialsProvider addCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {
+    private CredentialsProvider addBasicAuthCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope,
+                                                        final String username, final String password) {
         final CredentialsProvider cp = credentialsProvider != null ? credentialsProvider : new BasicCredentialsProvider();
 
         if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
@@ -339,12 +500,48 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         return cp;
     }
 
-    private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
+    private List<Header> getDefaultHeadersFromDynamicProperties(final ConfigurationContext context) {
+        return context.getProperties().entrySet().stream()
+                // filter non-null dynamic properties
+                .filter(e -> e.getKey().isDynamic() && StringUtils.isNotBlank(e.getValue())
+                        && StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue())
+                )
+                // convert to Headers
+                .map(e -> new BasicHeader(e.getKey().getName(),
+                        context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue()))
+                .collect(Collectors.toList());
+    }
+
+    private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final String apiKey) {
         final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
         final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
         return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
     }
 
+    private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
+        final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+        final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int snifferFailureDelayMillis = context.getProperty(SNIFFER_FAILURE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+        if (sniffClusterNodes) {
+            return Sniffer.builder(restClient)
+                    .setSniffIntervalMillis(snifferIntervalMillis)
+                    .setSniffAfterFailureDelayMillis(snifferFailureDelayMillis)
+                    .setNodesSniffer(setupElasticsearchNodesSniffer(context, restClient))
+                    .build();
+        }
+
+        return null;
+    }
+
+    private ElasticsearchNodesSniffer setupElasticsearchNodesSniffer(final ConfigurationContext context, final RestClient restClient) {
+        final Long snifferRequestTimeoutMillis = context.getProperty(SNIFFER_REQUEST_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final ElasticsearchNodesSniffer.Scheme scheme = this.url.toLowerCase(Locale.getDefault()).startsWith("https://")
+                ? ElasticsearchNodesSniffer.Scheme.HTTPS : ElasticsearchNodesSniffer.Scheme.HTTP;
+
+        return new ElasticsearchNodesSniffer(restClient, snifferRequestTimeoutMillis, scheme);
+    }
+
     private void appendIndex(final StringBuilder sb, final String index) {
         if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
             if (!index.startsWith("/")) {
@@ -798,10 +995,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         }
 
         if (getLogger().isDebugEnabled()) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            entity.writeTo(out);
-            out.close();
-
             StringBuilder builder = new StringBuilder(1000);
             builder.append("Dumping Elasticsearch REST request...\n")
                     .append("HTTP Method: ")
@@ -812,11 +1005,18 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                     .append("\n")
                     .append("Parameters: ")
                     .append(prettyPrintWriter.writeValueAsString(parameters))
-                    .append("\n")
-                    .append("Request body: ")
-                    .append(new String(out.toByteArray()))
                     .append("\n");
 
+            if (entity != null) {
+                final ByteArrayOutputStream out = new ByteArrayOutputStream();
+                entity.writeTo(out);
+                out.close();
+
+                builder.append("Request body: ")
+                        .append(out)
+                        .append("\n");
+            }
+
             getLogger().debug(builder.toString());
         }
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
new file mode 100644
index 0000000000..f0934cafe6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
@@ -0,0 +1,54 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ElasticSearchClientServiceImpl</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<h2>Sniffing</h2>
+<p>
+    The Elasticsearch Sniffer can be used to locate Elasticsearch Nodes within a Cluster to which you are connecting.
+    This can be beneficial if your cluster dynamically changes over time, e.g. new Nodes are added to maintain performance during heavy load.
+</p>
+<p>
+    Sniffing can also be used to update the list of Hosts within the Cluster if a connection Failure is encountered during operation.
+    In order to "Sniff on Failure", you <b>must</b> also enable "Sniff Cluster Nodes".
+</p>
+<p>
+    Not all situations make sense to use Sniffing, for example if:
+    <ul>
+        <li>Elasticsearch is situated behind a load balancer, which dynamically routes connections from NiFi</li>
+        <li>Elasticsearch is on a different network to NiFi</li>
+    </ul>
+</p>
+<p>
+    There may also be need to set some of the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html">
+    Elasticsearch Networking Advanced Settings</a>, such as <code>network.publish_host</code> to ensure that
+    the HTTP Hosts found by the Sniffer are accessible by NiFi. For example, Elasticsearch may use a network internal
+    <code>publish_host</code> that is inaccessible to NiFi, but instead should use an address/IP that NiFi understands.
+    It may also be necessary to add this same address to Elasticsearch's <code>network.bind_host</code> list.
+</p>
+<p>
+    See <a href="https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how">
+    Elasticsearch sniffing best practices: What, when, why, how</a> for more details of the best practices.
+</p>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
index 9f99ec0da8..bf82c8853d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/SearchResponseTest.java
@@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,17 +31,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class SearchResponseTest {
     @Test
     void test() {
-        List<Map<String, Object>> results = new ArrayList<>();
-        Map<String, Object> aggs    = new HashMap<>();
-        String pitId = "pitId";
-        String scrollId = "scrollId";
-        String searchAfter = "searchAfter";
-        int num     = 10;
-        int took    = 100;
-        boolean timeout = false;
-        List<String> warnings = Arrays.asList("auth");
-        SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
-        String str = response.toString();
+        final List<Map<String, Object>> results = new ArrayList<>();
+        final Map<String, Object> aggs    = new HashMap<>();
+        final String pitId = "pitId";
+        final String scrollId = "scrollId";
+        final String searchAfter = "searchAfter";
+        final int num     = 10;
+        final int took    = 100;
+        final boolean timeout = false;
+        final List<String> warnings = Collections.singletonList("auth");
+        final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
+        final String str = response.toString();
 
         assertEquals(results, response.getHits());
         assertEquals(aggs, response.getAggregations());
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
index 4ee196a9dc..c85879179f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java
@@ -28,10 +28,10 @@ import java.util.List;
 import java.util.Map;
 
 public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
-    private Map<String, Object> data;
+    private final Map<String, Object> data;
 
     public TestElasticSearchClientService() {
-        data = new HashMap<>();
+        data = new HashMap<>(4, 1);
         data.put("username", "john.smith");
         data.put("password", "testing1234");
         data.put("email", "john.smith@test.com");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
index 813ce1c5d2..7868c5ccd5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestSchemaRegistry.java
@@ -26,7 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -36,12 +36,12 @@ import static org.apache.nifi.schema.access.SchemaField.SCHEMA_NAME;
 public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
     @Override
     public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
-        List<RecordField> fields = Arrays.asList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+        List<RecordField> fields = Collections.singletonList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
         return new SimpleRecordSchema(fields);
     }
 
     @Override
     public Set<SchemaField> getSuppliedSchemaFields() {
-        return new HashSet<>(Arrays.asList(SCHEMA_NAME));
+        return new HashSet<>(Collections.singletonList(SCHEMA_NAME));
     }
 }
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 104e504b23..c9dfc752e6 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -38,6 +38,8 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
         runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
         service = new ElasticSearchClientServiceImpl();
         runner.addControllerService(CLIENT_SERVICE_NAME, service);
+        runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, CLIENT_SERVICE_NAME);
+
         runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, elasticsearchHost);
         runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
         runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
@@ -45,6 +47,15 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
         runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
         runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
         runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
+        runner.removeProperty(service, ElasticSearchClientService.API_KEY);
+        runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
+        runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "false");
+        runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "true");
+        runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "false");
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+        runner.removeProperty(service, ElasticSearchClientService.PATH_PREFIX);
+        runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_ANY.getValue());
 
         runner.enableControllerService(service);
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index d7f0d39874..402363aece 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -18,7 +18,12 @@
 package org.apache.nifi.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.VerifiableControllerService;
@@ -31,7 +36,6 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.elasticsearch.MapBuilder;
 import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
 import org.apache.nifi.elasticsearch.UpdateOperationResponse;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@@ -39,11 +43,15 @@ import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceLookup;
 import org.apache.nifi.util.MockVariableRegistry;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -82,13 +90,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+        assertVerifySnifferSkipped(results);
+    }
+
+    @Test
+    void testVerifySniffer() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
+        runner.enableControllerService(service);
+        assertVerifySniffer();
+
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+        runner.enableControllerService(service);
+        assertVerifySniffer();
+    }
+
+    private void assertVerifySniffer() {
+        final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+                new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+                runner.getLogger(),
+                Collections.emptyMap()
+        );
+        assertEquals(4, results.size());
+        assertEquals(4, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
     }
 
     @Test
     void testVerifySuccessWithApiKeyAuth() throws IOException {
-        final Pair<String, String> apiKey = createApiKeyForIndex("*");
+        final Pair<String, String> apiKey = createApiKeyForIndex();
 
         runner.disableControllerService(service);
         runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
@@ -103,8 +136,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+        assertVerifySnifferSkipped(results);
     }
 
     @Test
@@ -117,8 +151,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
-        assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(4, results.size());
+        assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
                         && Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName())
@@ -146,8 +180,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
-        assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(4, results.size());
+        assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
                         && Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName())
@@ -167,9 +201,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
-        assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
                         && Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@@ -193,9 +227,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
                 runner.getLogger(),
                 Collections.emptyMap()
         );
-        assertEquals(3, results.size());
+        assertEquals(4, results.size());
         assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
-        assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
         assertEquals(1, results.stream().filter(
                 result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
                         && Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@@ -623,6 +657,71 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         assertFalse(service.exists("index-does-not-exist", null), "index exists");
     }
 
+    @Test
+    void testCompression() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testNoMetaHeader() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "false");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testStrictDeprecation() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testNodeSelector() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS.getValue());
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testRestClientRequestHeaders() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, "User-Agent", "NiFi Integration Tests");
+        runner.setProperty(service, "X-Extra_header", "Request should still work");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
+    @Test
+    void testSniffer() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
+        runner.assertNotValid(service);
+
+        runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        assertTrue(service.exists(INDEX, null), "index does not exist");
+    }
+
     @Test
     void testNullSuppression() throws InterruptedException {
         final Map<String, Object> doc = new HashMap<>();
@@ -659,13 +758,12 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
     }
 
     private void suppressNulls(final boolean suppressNulls) {
-        runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service");
         runner.disableControllerService(service);
         runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls
                 ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()
                 : ElasticSearchClientService.NEVER_SUPPRESS.getValue());
         runner.enableControllerService(service);
-        runner.assertValid();
+        runner.assertValid(service);
     }
 
     @Test
@@ -823,4 +921,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         Thread.sleep(1000);
     }
 
+    private void assertVerifySnifferSkipped(final List<ConfigVerificationResult> results) {
+        assertEquals(1, results.stream().filter(
+                        result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_SNIFFER)
+                                && Objects.equals(result.getExplanation(), "Sniff on Connection not enabled")
+                                && result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(),
+                results.toString()
+        );
+    }
+
+    protected Pair<String, String> createApiKeyForIndex() throws IOException {
+        final String body = prettyJson(new MapBuilder()
+                .of("name", "test-api-key")
+                .of("role_descriptors", new MapBuilder()
+                        .of("test-role", new MapBuilder()
+                                .of("cluster", Collections.singletonList("all"))
+                                .of("index", Collections.singletonList(new MapBuilder()
+                                        .of("names", Collections.singletonList("*"))
+                                        .of("privileges", Collections.singletonList("all"))
+                                        .build()))
+                                .build())
+                        .build())
+                .build());
+        final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
+        final Request request = new Request("POST", endpoint);
+        final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
+        request.setEntity(jsonBody);
+
+        final Response response = testDataManagementClient.performRequest(request);
+        final InputStream inputStream = response.getEntity().getContent();
+        final byte[] result = IOUtils.toByteArray(inputStream);
+        inputStream.close();
+        final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() {});
+        return Pair.of(ret.get("id"), ret.get("api_key"));
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 7df9de2f07..c839a3f090 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -18,18 +18,18 @@ package org.apache.nifi.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.HostConfig;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.nio.entity.NStringEntity;
-import org.apache.nifi.elasticsearch.MapBuilder;
 import org.apache.nifi.util.TestRunner;
 import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.junit.jupiter.api.BeforeAll;
@@ -39,30 +39,38 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.http.auth.AuthScope.ANY;
 
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
+            .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.6.1"));
     protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
+    private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
             .withPassword(ELASTIC_USER_PASSWORD)
             .withEnv("xpack.security.enabled", "true")
             // enable API Keys for integration-tests (6.x & 7.x don't enable SSL and therefore API Keys by default, so use a trial license and explicitly enable API Keys)
             .withEnv("xpack.license.self_generated.type", "trial")
-            .withEnv("xpack.security.authc.api_key.enabled", "true");
+            .withEnv("xpack.security.authc.api_key.enabled", "true")
+            // use a "special address" to ensure the publish_host is in the bind_host list, otherwise the Sniffer won't work
+            // https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#network-interface-values
+            // TestContainers makes Elasticsearch available via localhost/127.0.0.1; Elasticsearch uses the IP Address in publish_host
+            .withEnv("network.bind_host", "_local_,_site_")
+            .withEnv("network.publish_host", "127.0.0.1")
+            // pin the Elasticsearch port (typically 9200 but not guaranteed), also bind that to 9200 on the host so the network.publish_host is accessible
+            .withEnv("http.port", String.valueOf(PORT))
+            .withExposedPorts(PORT)
+            .withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(
+                    new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(PORT), new ExposedPort(PORT)))
+            ));
     protected static final String CLIENT_SERVICE_NAME = "Client Service";
     protected static final String INDEX = "messages";
 
@@ -88,7 +96,7 @@ public abstract class AbstractElasticsearchITBase {
 
     protected static String type;
 
-    private static RestClient testDataManagementClient;
+    static RestClient testDataManagementClient;
 
     protected static void stopTestcontainer() {
         if (ENABLE_TEST_CONTAINERS) {
@@ -98,7 +106,6 @@ public abstract class AbstractElasticsearchITBase {
 
     @BeforeAll
     static void beforeAll() throws IOException {
-
         startTestcontainer();
         type = getElasticMajorVersion() == 6 ? "_doc" : "";
         System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: %s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
@@ -173,32 +180,6 @@ public abstract class AbstractElasticsearchITBase {
         return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
     }
 
-    protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
-        final String body = prettyJson(new MapBuilder()
-                .of("name", "test-api-key")
-                .of("role_descriptors", new MapBuilder()
-                        .of("test-role", new MapBuilder()
-                                .of("cluster", Collections.singletonList("all"))
-                                .of("index", Collections.singletonList(new MapBuilder()
-                                        .of("names", Collections.singletonList(index))
-                                        .of("privileges", Collections.singletonList("all"))
-                                        .build()))
-                                .build())
-                        .build())
-                .build());
-        final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
-        final Request request = new Request("POST", endpoint);
-        final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
-        request.setEntity(jsonBody);
-
-        final Response response = testDataManagementClient.performRequest(request);
-        final InputStream inputStream = response.getEntity().getContent();
-        final byte[] result = IOUtils.toByteArray(inputStream);
-        inputStream.close();
-        final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
-        return Pair.of(ret.get("id"), ret.get("api_key"));
-    }
-
     private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
         final List<SetupAction> actions = new ArrayList<>();
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 128260533f..f906a9a4cd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -31,6 +31,17 @@ language governing permissions and limitations under the License. -->
         <module>nifi-elasticsearch-restapi-processors</module>
     </modules>
 
+    <properties>
+        <!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
+         to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
+         see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
+
+         Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
+         (https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
+         of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
+        <elasticsearch.client.version>7.13.4</elasticsearch.client.version>
+    </properties>
+
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -53,14 +64,7 @@ language governing permissions and limitations under the License. -->
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>elasticsearch-rest-client</artifactId>
-                <!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
-                 to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
-                 see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
-
-                 Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
-                 (https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
-                 of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
-                <version>7.13.4</version>
+                <version>${elasticsearch.client.version}</version>
                 <scope>compile</scope>
                 <exclusions>
                     <exclusion>
@@ -73,6 +77,18 @@ language governing permissions and limitations under the License. -->
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.elasticsearch.client</groupId>
+                <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+                <version>${elasticsearch.client.version}</version>
+                <scope>compile</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>commons-logging</groupId>
+                        <artifactId>commons-logging</artifactId>
+                    </exclusion>
+                </exclusions>
+        </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -85,7 +101,7 @@ language governing permissions and limitations under the License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.6.1</elasticsearch_docker_image>
                 <elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>
@@ -116,7 +132,7 @@ language governing permissions and limitations under the License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                <elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
+                <elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>