You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/11/08 22:03:08 UTC
[nifi] branch main updated: NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d374c1f399 NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl
d374c1f399 is described below
commit d374c1f399a517266c76cc4c924d01c72054bc2f
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Fri Nov 4 12:50:04 2022 +0100
NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl
This closes #6619
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi/elasticsearch/AuthorizationScheme.java | 47 +++++++++++++++
.../elasticsearch/ElasticSearchClientService.java | 38 ++++++++++++
.../ElasticSearchClientServiceImpl.java | 69 ++++++++++++++++++----
.../integration/AbstractElasticsearch_IT.java | 2 +
.../integration/ElasticSearchClientService_IT.java | 55 +++++++++++++++--
.../integration/AbstractElasticsearchITBase.java | 39 ++++++++++++
6 files changed, 236 insertions(+), 14 deletions(-)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
new file mode 100644
index 0000000000..2098e1cd87
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.elasticsearch;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AuthorizationScheme implements DescribedValue {
+ BASIC("Basic", "Basic authorization scheme."),
+ API_KEY("API Key", "API key authorization scheme.");
+
+ private final String displayName;
+ private final String description;
+
+ AuthorizationScheme(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 15a45b4b7f..5ce690c09d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -43,6 +43,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+
PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("el-cs-ssl-context-service")
.displayName("SSL Context Service")
@@ -53,23 +54,58 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.addValidator(Validator.VALID)
.build();
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
+
+ PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
+ .name("authorization-scheme")
+ .displayName("Authorization Scheme")
+ .description("Authorization Scheme used for authenticating to Elasticsearch using the HTTP Authorization header.")
+ .allowableValues(AuthorizationScheme.class)
+ .defaultValue(AuthorizationScheme.BASIC.getValue())
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
.description("The username to use with XPack security.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+
PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("el-cs-password")
.displayName("Password")
.description("The password to use with XPack security.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
.required(false)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+
+ PropertyDescriptor API_KEY_ID = new PropertyDescriptor.Builder()
+ .name("api-key-id")
+ .displayName("API Key ID")
+ .description("Unique identifier of the API key.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
+ .required(false)
+ .sensitive(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+ .name("api-key")
+ .displayName("API Key")
+ .description("Encoded API key.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-connect-timeout")
.displayName("Connect timeout")
@@ -78,6 +114,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.defaultValue("5000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+
PropertyDescriptor SOCKET_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-socket-timeout")
.displayName("Read timeout")
@@ -99,6 +136,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.defaultValue("60000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+
PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("el-cs-charset")
.displayName("Charset")
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 29ecad5609..3c8cee35ae 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -29,11 +29,14 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
@@ -60,6 +63,8 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -84,16 +89,19 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static {
final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(ElasticSearchClientService.HTTP_HOSTS);
- props.add(ElasticSearchClientService.USERNAME);
- props.add(ElasticSearchClientService.PASSWORD);
- props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
- props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
- props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
- props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
- props.add(ElasticSearchClientService.RETRY_TIMEOUT);
- props.add(ElasticSearchClientService.CHARSET);
- props.add(ElasticSearchClientService.SUPPRESS_NULLS);
+ props.add(HTTP_HOSTS);
+ props.add(AUTHORIZATION_SCHEME);
+ props.add(USERNAME);
+ props.add(PASSWORD);
+ props.add(API_KEY_ID);
+ props.add(API_KEY);
+ props.add(PROP_SSL_CONTEXT_SERVICE);
+ props.add(PROXY_CONFIGURATION_SERVICE);
+ props.add(CONNECT_TIMEOUT);
+ props.add(SOCKET_TIMEOUT);
+ props.add(RETRY_TIMEOUT);
+ props.add(CHARSET);
+ props.add(SUPPRESS_NULLS);
properties = Collections.unmodifiableList(props);
}
@@ -103,6 +111,34 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return properties;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(1);
+
+ final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
+ final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
+
+ if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
+ results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), PASSWORD.getDisplayName()))
+ .valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", USERNAME.getDisplayName(), PASSWORD.getDisplayName())).build());
+ }
+
+ final boolean apiKeyIdSet = validationContext.getProperty(API_KEY_ID).isSet();
+ final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
+
+ if ((apiKeyIdSet && !apiKeySet) || (!apiKeyIdSet && apiKeySet)) {
+ results.add(new ValidationResult.Builder().subject(String.format("%s and %s", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName()))
+ .valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName())).build());
+ }
+
+ if (usernameSet && apiKeyIdSet) {
+ results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName()))
+ .valid(false).explanation(String.format("'%s' and '%s' cannot be used together.", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName())).build());
+ }
+
+ return results;
+ }
+
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
@@ -206,6 +242,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
+ final String apiKey = context.getProperty(API_KEY).getValue();
+
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
@@ -237,6 +276,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
}
+ if (apiKeyId != null && apiKey != null) {
+ httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
+ }
+
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
@@ -275,6 +318,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return cp;
}
+ private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
+ final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
+ final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
+ return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
+ }
+
private void appendIndex(final StringBuilder sb, final String index) {
if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
if (!index.startsWith("/")) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 40df462a1a..104e504b23 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.elasticsearch.integration;
+import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
@@ -41,6 +42,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue());
+ runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 035b3b9bc4..cd1bd3ca40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -18,9 +18,11 @@
package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
@@ -41,6 +43,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -66,10 +69,6 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
service.onDisabled();
}
- private String prettyJson(final Object o) throws JsonProcessingException {
- return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
- }
-
private Map<PropertyDescriptor, String> getClientServiceProperties() {
return ((MockControllerServiceLookup) runner.getProcessContext().getControllerServiceLookup())
.getControllerServices().get(CLIENT_SERVICE_NAME).getProperties();
@@ -86,6 +85,27 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
}
+ @Test
+ void testVerifySuccessWithApiKeyAuth() throws IOException {
+ final Pair<String, String> apiKey = createApiKeyForIndex("*");
+
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
+ runner.removeProperty(service, ElasticSearchClientService.USERNAME);
+ runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
+ runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, apiKey.getKey());
+ runner.setProperty(service, ElasticSearchClientService.API_KEY, apiKey.getValue());
+ runner.enableControllerService(service);
+
+ final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+ new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+ runner.getLogger(),
+ Collections.emptyMap()
+ );
+ assertEquals(3, results.size());
+ assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+ }
+
@Test
void testVerifyFailedURL() {
runner.disableControllerService(service);
@@ -157,6 +177,32 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
);
}
+ @Test
+ void testVerifyFailedApiKeyAuth() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
+ runner.removeProperty(service, ElasticSearchClientService.USERNAME);
+ runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
+ runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "invalid");
+ runner.setProperty(service, ElasticSearchClientService.API_KEY, "not-real");
+ runner.enableControllerService(service);
+
+ final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
+ new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
+ runner.getLogger(),
+ Collections.emptyMap()
+ );
+ assertEquals(3, results.size());
+ assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
+ assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
+ assertEquals(1, results.stream().filter(
+ result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
+ && Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
+ && result.getOutcome() == ConfigVerificationResult.Outcome.FAILED).count(),
+ results.toString()
+ );
+ }
+
@Test
void testBasicSearch() throws Exception {
final Map<String, Object> temp = new MapBuilder()
@@ -755,4 +801,5 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
private static void waitForIndexRefresh() throws InterruptedException {
Thread.sleep(1000);
}
+
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 989844d3df..6208319a36 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -16,15 +16,20 @@
*/
package org.apache.nifi.elasticsearch.integration;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.util.TestRunner;
import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.BeforeAll;
@@ -34,12 +39,16 @@ import org.testcontainers.utility.DockerImageName;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.http.auth.AuthScope.ANY;
@@ -150,6 +159,36 @@ public abstract class AbstractElasticsearchITBase {
}
}
+ protected String prettyJson(final Object o) throws JsonProcessingException {
+ return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
+ }
+
+ protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
+ final String body = prettyJson(new MapBuilder()
+ .of("name", "test-api-key")
+ .of("role_descriptors", new MapBuilder()
+ .of("test-role", new MapBuilder()
+ .of("cluster", Collections.singletonList("all"))
+ .of("index", Collections.singletonList(new MapBuilder()
+ .of("names", Collections.singletonList(index))
+ .of("privileges", Collections.singletonList("all"))
+ .build()))
+ .build())
+ .build())
+ .build());
+ final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
+ final Request request = new Request("POST", endpoint);
+ final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
+ request.setEntity(jsonBody);
+
+ final Response response = testDataManagementClient.performRequest(request);
+ final InputStream inputStream = response.getEntity().getContent();
+ final byte[] result = IOUtils.toByteArray(inputStream);
+ inputStream.close();
+ final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
+ return Pair.of(ret.get("id"), ret.get("api_key"));
+ }
+
private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
final List<SetupAction> actions = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {