You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/11/08 22:03:08 UTC

[nifi] branch main updated: NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d374c1f399 NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl
d374c1f399 is described below

commit d374c1f399a517266c76cc4c924d01c72054bc2f
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Nov 4 12:50:04 2022 +0100

    NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl
    
    This closes #6619
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi/elasticsearch/AuthorizationScheme.java    | 47 +++++++++++++++
 .../elasticsearch/ElasticSearchClientService.java  | 38 ++++++++++++
 .../ElasticSearchClientServiceImpl.java            | 69 ++++++++++++++++++----
 .../integration/AbstractElasticsearch_IT.java      |  2 +
 .../integration/ElasticSearchClientService_IT.java | 55 +++++++++++++++--
 .../integration/AbstractElasticsearchITBase.java   | 39 ++++++++++++
 6 files changed, 236 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
new file mode 100644
index 0000000000..2098e1cd87
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.elasticsearch;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AuthorizationScheme implements DescribedValue {
+    BASIC("Basic", "Basic authorization scheme."),
+    API_KEY("API Key", "API key authorization scheme.");
+
+    private final String displayName;
+    private final String description;
+
+    AuthorizationScheme(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 15a45b4b7f..5ce690c09d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -43,6 +43,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
     PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("el-cs-ssl-context-service")
             .displayName("SSL Context Service")
@@ -53,23 +54,58 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
             .addValidator(Validator.VALID)
             .build();
     PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
+
+    PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
+            .name("authorization-scheme")
+            .displayName("Authorization Scheme")
+            .description("Authorization Scheme used for authenticating to Elasticsearch using the HTTP Authorization header.")
+            .allowableValues(AuthorizationScheme.class)
+            .defaultValue(AuthorizationScheme.BASIC.getValue())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("el-cs-username")
             .displayName("Username")
             .description("The username to use with XPack security.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
     PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("el-cs-password")
             .displayName("Password")
             .description("The password to use with XPack security.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
             .required(false)
             .sensitive(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
+    PropertyDescriptor API_KEY_ID = new PropertyDescriptor.Builder()
+            .name("api-key-id")
+            .displayName("API Key ID")
+            .description("Unique identifier of the API key.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
+            .required(false)
+            .sensitive(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+            .name("api-key")
+            .displayName("API Key")
+            .description("Encoded API key.")
+            .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
             .name("el-cs-connect-timeout")
             .displayName("Connect timeout")
@@ -78,6 +114,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
             .defaultValue("5000")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
+
     PropertyDescriptor SOCKET_TIMEOUT = new PropertyDescriptor.Builder()
             .name("el-cs-socket-timeout")
             .displayName("Read timeout")
@@ -99,6 +136,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
             .defaultValue("60000")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
+
     PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
             .name("el-cs-charset")
             .displayName("Charset")
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 29ecad5609..3c8cee35ae 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -29,11 +29,14 @@ import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
@@ -60,6 +63,8 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -84,16 +89,19 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
 
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(ElasticSearchClientService.HTTP_HOSTS);
-        props.add(ElasticSearchClientService.USERNAME);
-        props.add(ElasticSearchClientService.PASSWORD);
-        props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
-        props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
-        props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
-        props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
-        props.add(ElasticSearchClientService.RETRY_TIMEOUT);
-        props.add(ElasticSearchClientService.CHARSET);
-        props.add(ElasticSearchClientService.SUPPRESS_NULLS);
+        props.add(HTTP_HOSTS);
+        props.add(AUTHORIZATION_SCHEME);
+        props.add(USERNAME);
+        props.add(PASSWORD);
+        props.add(API_KEY_ID);
+        props.add(API_KEY);
+        props.add(PROP_SSL_CONTEXT_SERVICE);
+        props.add(PROXY_CONFIGURATION_SERVICE);
+        props.add(CONNECT_TIMEOUT);
+        props.add(SOCKET_TIMEOUT);
+        props.add(RETRY_TIMEOUT);
+        props.add(CHARSET);
+        props.add(SUPPRESS_NULLS);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -103,6 +111,34 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         return properties;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(1);
+
+        final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
+        final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
+
+        if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
+            results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), PASSWORD.getDisplayName()))
+                    .valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", USERNAME.getDisplayName(), PASSWORD.getDisplayName())).build());
+        }
+
+        final boolean apiKeyIdSet = validationContext.getProperty(API_KEY_ID).isSet();
+        final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
+
+        if ((apiKeyIdSet && !apiKeySet) || (!apiKeyIdSet && apiKeySet)) {
+            results.add(new ValidationResult.Builder().subject(String.format("%s and %s", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName()))
+                    .valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName())).build());
+        }
+
+        if (usernameSet && apiKeyIdSet) {
+            results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName()))
+                    .valid(false).explanation(String.format("'%s' and '%s' cannot be used together.", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName())).build());
+        }
+
+        return results;
+    }
+
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws InitializationException {
         try {
@@ -206,6 +242,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
+        final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
+        final String apiKey = context.getProperty(API_KEY).getValue();
+
         final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
         final Integer readTimeout    = context.getProperty(SOCKET_TIMEOUT).asInteger();
 
@@ -237,6 +276,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
                         credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
                     }
 
+                    if (apiKeyId != null && apiKey != null) {
+                        httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
+                    }
+
                     if (proxyConfigurationService != null) {
                         final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
                         if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
@@ -275,6 +318,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
         return cp;
     }
 
+    private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
+        final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
+        final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
+        return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
+    }
+
     private void appendIndex(final StringBuilder sb, final String index) {
         if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
             if (!index.startsWith("/")) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 40df462a1a..104e504b23 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.elasticsearch.integration;
 
+import org.apache.nifi.elasticsearch.AuthorizationScheme;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
 import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
@@ -41,6 +42,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
         runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
         runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
         runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue());
+        runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
         runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
         runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
 
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 035b3b9bc4..cd1bd3ca40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -18,9 +18,11 @@
 package org.apache.nifi.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.elasticsearch.AuthorizationScheme;
 import org.apache.nifi.elasticsearch.DeleteOperationResponse;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
@@ -41,6 +43,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -66,10 +69,6 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         service.onDisabled();
     }
 
-    private String prettyJson(final Object o) throws JsonProcessingException {
-        return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
-    }
-
     private Map<PropertyDescriptor, String> getClientServiceProperties() {
         return ((MockControllerServiceLookup) runner.getProcessContext().getControllerServiceLookup())
                 .getControllerServices().get(CLIENT_SERVICE_NAME).getProperties();
@@ -86,6 +85,27 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
     }
 
+    @Test
+    void testVerifySuccessWithApiKeyAuth() throws IOException {
+        final Pair<String, String> apiKey = createApiKeyForIndex("*");
+
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
+        runner.removeProperty(service, ElasticSearchClientService.USERNAME);
+        runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
+        runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, apiKey.getKey());
+        runner.setProperty(service, ElasticSearchClientService.API_KEY, apiKey.getValue());
+        runner.enableControllerService(service);
+
+        final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+                new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+                runner.getLogger(),
+                Collections.emptyMap()
+        );
+        assertEquals(3, results.size());
+        assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+    }
+
     @Test
     void testVerifyFailedURL() {
         runner.disableControllerService(service);
@@ -157,6 +177,32 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
         );
     }
 
+    @Test
+    void testVerifyFailedApiKeyAuth() {
+        runner.disableControllerService(service);
+        runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
+        runner.removeProperty(service, ElasticSearchClientService.USERNAME);
+        runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
+        runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "invalid");
+        runner.setProperty(service, ElasticSearchClientService.API_KEY, "not-real");
+        runner.enableControllerService(service);
+
+        final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+                new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+                runner.getLogger(),
+                Collections.emptyMap()
+        );
+        assertEquals(3, results.size());
+        assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+        assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+        assertEquals(1, results.stream().filter(
+                result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
+                        && Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
+                        && result.getOutcome() == ConfigVerificationResult.Outcome.FAILED).count(),
+                results.toString()
+        );
+    }
+
     @Test
     void testBasicSearch() throws Exception {
         final Map<String, Object> temp = new MapBuilder()
@@ -755,4 +801,5 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
     private static void waitForIndexRefresh() throws InterruptedException {
         Thread.sleep(1000);
     }
+
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 989844d3df..6208319a36 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -16,15 +16,20 @@
  */
 package org.apache.nifi.elasticsearch.integration;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.elasticsearch.MapBuilder;
 import org.apache.nifi.util.TestRunner;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,12 +39,16 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.http.auth.AuthScope.ANY;
 
@@ -150,6 +159,36 @@ public abstract class AbstractElasticsearchITBase {
         }
     }
 
+    protected String prettyJson(final Object o) throws JsonProcessingException {
+        return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
+    }
+
+    protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
+        final String body = prettyJson(new MapBuilder()
+                .of("name", "test-api-key")
+                .of("role_descriptors", new MapBuilder()
+                        .of("test-role", new MapBuilder()
+                                .of("cluster", Collections.singletonList("all"))
+                                .of("index", Collections.singletonList(new MapBuilder()
+                                        .of("names", Collections.singletonList(index))
+                                        .of("privileges", Collections.singletonList("all"))
+                                        .build()))
+                                .build())
+                        .build())
+                .build());
+        final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
+        final Request request = new Request("POST", endpoint);
+        final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
+        request.setEntity(jsonBody);
+
+        final Response response = testDataManagementClient.performRequest(request);
+        final InputStream inputStream = response.getEntity().getContent();
+        final byte[] result = IOUtils.toByteArray(inputStream);
+        inputStream.close();
+        final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
+        return Pair.of(ret.get("id"), ret.get("api_key"));
+    }
+
     private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
         final List<SetupAction> actions = new ArrayList<>();
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {