You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "exceptionfactory (via GitHub)" <gi...@apache.org> on 2023/05/19 14:14:33 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7211: NIFI-8287: Updating SQS processors to use AWS SDK version 2.x

exceptionfactory commented on code in PR #7211:
URL: https://github.com/apache/nifi/pull/7211#discussion_r1198998416


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
+import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.SSLContextService;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import java.io.File;
+import java.net.Proxy;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for aws processors using the AWS v2 SDK.
+ *
+ * @param <T> client type
+ *
+ * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+ */
+public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
+        extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider<T> {
+
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+    public static final Region DEFAULT_REGION = Region.US_WEST_2;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure relationship").build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
+    public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
+    public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;

Review Comment:
   Is it necessary to re-declare these property variables? Does that limit the changes required in subclasses?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml:
##########
@@ -88,6 +98,11 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>

Review Comment:
   The `nifi-security-utils` module includes a number of unnecessary items. Instead of using `KeyStoreUtils`, the builders in `nifi-security-ssl` should be used, avoiding this dependency.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
+import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.SSLContextService;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import java.io.File;
+import java.net.Proxy;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for aws processors using the AWS v2 SDK.
+ *
+ * @param <T> client type
+ *
+ * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+ */
+public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
+        extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider<T> {
+
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+    public static final Region DEFAULT_REGION = Region.US_WEST_2;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure relationship").build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
+    public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
+    public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
+
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("Proxy host name or IP")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Host Port")
+            .description("Proxy host port")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
+            .name("proxy-user-name")
+            .displayName("Proxy Username")
+            .description("Proxy username")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
+            .name("proxy-user-password")
+            .displayName("Proxy Password")
+            .description("Proxy password")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            .defaultValue(createAllowableValue(Region.US_WEST_2).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+     * @see  <a href="https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .displayName("AWS Credentials Provider Service")
+            .description("The Controller Service that is used to obtain AWS credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
+    protected volatile T client;
+    protected volatile Region region;
+
+    private final AwsClientCache<T> awsClientCache = new AwsClientCache<>();
+
+    /**
+     * Construct the AWS SDK client builder and perform any service-specific configuration of the builder.
+     * @param context The process context
+     * @return The SDK client builder
+     */
+    protected abstract U createClientBuilder(final ProcessContext context);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
+        }
+
+        final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
+        }
+
+        final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+        final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
+
+        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+        }
+
+        final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
+        final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
+
+        if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
+        }
+
+        if (proxyUserSet && !proxyHostSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build());
+        }
+
+        ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS);
+
+        if (proxyHostSet && proxyConfigServiceSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+                    .explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build());
+        }
+
+        return validationResults;
+    }
+
+    @OnShutdown
+    public void onShutDown() {
+        if (this.client != null) {
+            this.client.close();
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.awsClientCache.clearCache();
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        try {
+            createClient(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Create Client")
+                    .explanation("Successfully created AWS Client")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create AWS Client", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Create Client")
+                    .explanation("Failed to crete AWS Client: " + e.getMessage())
+                    .build());
+        }
+
+        return results;
+    }
+
+    protected void configureClientBuilder(final U clientBuilder, final ProcessContext context) {
+        clientBuilder.overrideConfiguration(builder -> builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, DEFAULT_USER_AGENT));
+        clientBuilder.overrideConfiguration(builder -> builder.retryPolicy(RetryPolicy.none()));
+        clientBuilder.httpClient(createSdkHttpClient(context));
+
+        final Region region = getRegion(context);
+        if (region != null) {
+            clientBuilder.region(region);
+        }
+        configureEndpoint(context, clientBuilder);
+
+        final AwsCredentialsProvider credentialsProvider = getCredentialsProvider(context);
+        clientBuilder.credentialsProvider(credentialsProvider);
+    }
+
+    /**
+     * Creates the AWS SDK client.
+     * @param context The process context
+     * @return The created client
+     */
+    @Override
+    public T createClient(final ProcessContext context) {
+        final U clientBuilder = createClientBuilder(context);
+        this.configureClientBuilder(clientBuilder, context);
+        return clientBuilder.build();
+    }
+
+    /**
+     * Creates an AWS service client from the context or returns an existing client from the cache
+     * @param context The process context
+     * @param  awsClientDetails details of the AWS client
+     * @return The created client
+     */
+    protected T getClient(final ProcessContext context, final AwsClientDetails awsClientDetails) {
+        return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this);
+    }
+
+    protected T getClient(final ProcessContext context) {
+        final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context));
+        return getClient(context, awsClientDetails);
+    }
+
+    private SdkHttpClient createSdkHttpClient(final ProcessContext context) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+
+        final int communicationsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.maxConnections(context.getMaxConcurrentTasks());
+
+        if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
+            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            if (sslContextService != null) {
+                final TrustManager[] trustManagers = new TrustManager[] { sslContextService.createTrustManager() };
+                final KeyManagerFactory keyManagerFactory;
+                try {
+                    keyManagerFactory = KeyStoreUtils.loadKeyManagerFactory(sslContextService.createTlsConfiguration());
+                    builder.tlsTrustManagersProvider(() -> trustManagers);
+                    builder.tlsKeyManagersProvider(() -> keyManagerFactory.getKeyManagers());
+                } catch (final TlsException e) {
+                    throw new RuntimeException("Failed to configure TLS Key Manager", e);

Review Comment:
   This should probably be something more specific like `IllegalArgumentException`.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
+import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.SSLContextService;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import java.io.File;
+import java.net.Proxy;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for aws processors using the AWS v2 SDK.
+ *
+ * @param <T> client type
+ *
+ * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+ */
+public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
+        extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider<T> {
+
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+    public static final Region DEFAULT_REGION = Region.US_WEST_2;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure relationship").build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
+    public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
+    public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
+
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("Proxy host name or IP")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Host Port")
+            .description("Proxy host port")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
+            .name("proxy-user-name")
+            .displayName("Proxy Username")
+            .description("Proxy username")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
+            .name("proxy-user-password")
+            .displayName("Proxy Password")
+            .description("Proxy password")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            .defaultValue(createAllowableValue(Region.US_WEST_2).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+     * @see  <a href="https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .displayName("AWS Credentials Provider Service")
+            .description("The Controller Service that is used to obtain AWS credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
+    protected volatile T client;
+    protected volatile Region region;
+
+    private final AwsClientCache<T> awsClientCache = new AwsClientCache<>();
+
+    /**
+     * Construct the AWS SDK client builder and perform any service-specific configuration of the builder.
+     * @param context The process context
+     * @return The SDK client builder
+     */
+    protected abstract U createClientBuilder(final ProcessContext context);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
+        }
+
+        final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
+        }
+
+        final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+        final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
+
+        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+        }
+
+        final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
+        final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
+
+        if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
+        }
+
+        if (proxyUserSet && !proxyHostSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build());
+        }
+
+        ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS);
+
+        if (proxyHostSet && proxyConfigServiceSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+                    .explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build());
+        }
+
+        return validationResults;
+    }
+
+    @OnShutdown
+    public void onShutDown() {
+        if (this.client != null) {
+            this.client.close();
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.awsClientCache.clearCache();
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        try {
+            createClient(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Create Client")
+                    .explanation("Successfully created AWS Client")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create AWS Client", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Create Client")
+                    .explanation("Failed to crete AWS Client: " + e.getMessage())
+                    .build());
+        }
+
+        return results;
+    }
+
+    protected void configureClientBuilder(final U clientBuilder, final ProcessContext context) {
+        clientBuilder.overrideConfiguration(builder -> builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, DEFAULT_USER_AGENT));
+        clientBuilder.overrideConfiguration(builder -> builder.retryPolicy(RetryPolicy.none()));
+        clientBuilder.httpClient(createSdkHttpClient(context));
+
+        final Region region = getRegion(context);
+        if (region != null) {
+            clientBuilder.region(region);
+        }
+        configureEndpoint(context, clientBuilder);
+
+        final AwsCredentialsProvider credentialsProvider = getCredentialsProvider(context);
+        clientBuilder.credentialsProvider(credentialsProvider);
+    }
+
+    /**
+     * Creates the AWS SDK client.
+     * @param context The process context
+     * @return The created client
+     */
+    @Override
+    public T createClient(final ProcessContext context) {
+        final U clientBuilder = createClientBuilder(context);
+        this.configureClientBuilder(clientBuilder, context);
+        return clientBuilder.build();
+    }
+
+    /**
+     * Creates an AWS service client from the context or returns an existing client from the cache
+     * @param context The process context
+     * @param  awsClientDetails details of the AWS client
+     * @return The created client
+     */
+    protected T getClient(final ProcessContext context, final AwsClientDetails awsClientDetails) {
+        return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this);
+    }
+
+    protected T getClient(final ProcessContext context) {
+        final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context));
+        return getClient(context, awsClientDetails);
+    }
+
+    private SdkHttpClient createSdkHttpClient(final ProcessContext context) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+
+        final int communicationsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.maxConnections(context.getMaxConcurrentTasks());
+
+        if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
+            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            if (sslContextService != null) {
+                final TrustManager[] trustManagers = new TrustManager[] { sslContextService.createTrustManager() };
+                final KeyManagerFactory keyManagerFactory;
+                try {
+                    keyManagerFactory = KeyStoreUtils.loadKeyManagerFactory(sslContextService.createTlsConfiguration());
+                    builder.tlsTrustManagersProvider(() -> trustManagers);
+                    builder.tlsKeyManagersProvider(() -> keyManagerFactory.getKeyManagers());
+                } catch (final TlsException e) {
+                    throw new RuntimeException("Failed to configure TLS Key Manager", e);
+                }
+
+            }
+        }
+
+        final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+            if (context.getProperty(PROXY_HOST).isSet()) {
+                final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+                final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+                final Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
+                final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+                final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+                componentProxyConfig.setProxyServerHost(proxyHost);
+                componentProxyConfig.setProxyServerPort(proxyPort);
+                componentProxyConfig.setProxyUserName(proxyUsername);
+                componentProxyConfig.setProxyUserPassword(proxyPassword);
+                return componentProxyConfig;
+            } else if (context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet()) {
+                final ProxyConfigurationService configurationService = context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+                return configurationService.getConfiguration();
+            }
+            return ProxyConfiguration.DIRECT_CONFIGURATION;
+        });
+
+        if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+            final software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyConfigBuilder = software.amazon.awssdk.http.apache.ProxyConfiguration.builder()
+                    .endpoint(URI.create(String.format("%s:%s", proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort())));
+
+            if (proxyConfig.hasCredential()) {
+                proxyConfigBuilder.username(proxyConfig.getProxyUserName());
+                proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
+            }
+            builder.proxyConfiguration(proxyConfigBuilder.build());
+        }
+
+        return builder.build();
+    }
+
+    protected Region getRegion(final ProcessContext context) {
+        final Region region;
+        // if the processor supports REGION, get the configured region.
+        if (getSupportedPropertyDescriptors().contains(REGION)) {
+            final String regionValue = context.getProperty(REGION).getValue();
+            if (regionValue != null) {
+                region = Region.of(regionValue);
+            } else {
+                region = null;
+            }
+        } else {
+            region = null;
+        }
+        return region;
+    }
+
+    protected void configureEndpoint(final ProcessContext context, final U clientBuilder) {
+        // if the endpoint override has been configured, set the endpoint.
+        // (per Amazon docs this should only be configured at client creation)
+        if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
+            final String endpointOverride = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
+
+            if (!endpointOverride.isEmpty()) {
+                getLogger().info("Overriding endpoint with {}", endpointOverride);
+
+                clientBuilder.endpointOverride(URI.create(endpointOverride));
+            }
+        }
+    }
+
+    /**
+     * Get credentials provider using the {@link AwsCredentialsProvider}
+     * @param context the process context
+     * @return AwsCredentialsProvider the credential provider
+     */
+    protected AwsCredentialsProvider getCredentialsProvider(final ProcessContext context) {
+        final AWSCredentialsProviderService awsCredentialsProviderService =
+              context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
+
+        return awsCredentialsProviderService != null ? awsCredentialsProviderService.getAwsCredentialsProvider() : createStaticCredentialsProvider(context);
+
+    }
+
+    protected AwsCredentialsProvider createStaticCredentialsProvider(final PropertyContext context) {
+        final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
+        final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
+
+        final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
+
+        if (credentialsFile != null) {
+            return new PropertiesCredentialsProvider(new File(credentialsFile));
+        }
+
+        if (accessKey != null && secretKey != null) {
+            return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
+        }
+
+        return AnonymousCredentialsProvider.create();
+    }
+
+    public static AllowableValue createAllowableValue(final Region region) {
+        final String description = region.metadata() != null ? region.metadata().description() : region.id();
+        return new AllowableValue(region.id(), description, "AWS Region Code : " + region.id());
+    }
+
+    public static AllowableValue[] getAvailableRegions() {
+        final List<AllowableValue> values = new ArrayList<>();
+        for (final Region region : Region.regions()) {
+            values.add(createAllowableValue(region));
+        }
+        return values.toArray(new AllowableValue[0]);
+    }

Review Comment:
   It probably makes more sense to move these methods to an enum or utility class.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AwsClientCache.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.processor.ProcessContext;
+import software.amazon.awssdk.core.SdkClient;
+
+public class AwsClientCache<T extends SdkClient> {
+
+    private static final int MAXIMUM_CACHE_SIZE = 10;

Review Comment:
   Any particular reason for the cache size of 10? It would be worth adding a comment of some kind.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.v2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
+import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.SSLContextService;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import java.io.File;
+import java.net.Proxy;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for aws processors using the AWS v2 SDK.
+ *
+ * @param <T> client type
+ *
+ * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+ */
+public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyncClientBuilder<U, T> & AwsClientBuilder<U, T>>
+        extends AbstractProcessor implements VerifiableProcessor, AwsClientProvider<T> {
+
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+    public static final Region DEFAULT_REGION = Region.US_WEST_2;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure relationship").build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
+    public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
+    public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
+
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("Proxy host name or IP")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Host Port")
+            .description("Proxy host port")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
+            .name("proxy-user-name")
+            .displayName("Proxy Username")
+            .description("Proxy username")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
+            .name("proxy-user-password")
+            .displayName("Proxy Password")
+            .description("Proxy password")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            .defaultValue(createAllowableValue(Region.US_WEST_2).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+     * @see  <a href="https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/auth/credentials/AwsCredentialsProvider.html">AwsCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .displayName("AWS Credentials Provider Service")
+            .description("The Controller Service that is used to obtain AWS credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+    public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
+    protected volatile T client;
+    protected volatile Region region;
+
+    private final AwsClientCache<T> awsClientCache = new AwsClientCache<>();
+
+    /**
+     * Construct the AWS SDK client builder and perform any service-specific configuration of the builder.
+     * @param context The process context
+     * @return The SDK client builder
+     */
+    protected abstract U createClientBuilder(final ProcessContext context);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
+        }
+
+        final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            validationResults.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
+        }
+
+        final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+        final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
+
+        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+        }
+
+        final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
+        final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
+
+        if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
+        }
+
+        if (proxyUserSet && !proxyHostSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build());
+        }
+
+        ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS);
+
+        if (proxyHostSet && proxyConfigServiceSet) {
+            validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+                    .explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build());
+        }
+
+        return validationResults;
+    }
+
+    @OnShutdown
+    public void onShutDown() {
+        if (this.client != null) {
+            this.client.close();
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.awsClientCache.clearCache();
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        try {
+            createClient(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Create Client")
+                    .explanation("Successfully created AWS Client")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create AWS Client", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Create Client")
+                    .explanation("Failed to crete AWS Client: " + e.getMessage())
+                    .build());
+        }
+
+        return results;
+    }
+
+    protected void configureClientBuilder(final U clientBuilder, final ProcessContext context) {
+        clientBuilder.overrideConfiguration(builder -> builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, DEFAULT_USER_AGENT));
+        clientBuilder.overrideConfiguration(builder -> builder.retryPolicy(RetryPolicy.none()));
+        clientBuilder.httpClient(createSdkHttpClient(context));
+
+        final Region region = getRegion(context);
+        if (region != null) {
+            clientBuilder.region(region);
+        }
+        configureEndpoint(context, clientBuilder);
+
+        final AwsCredentialsProvider credentialsProvider = getCredentialsProvider(context);
+        clientBuilder.credentialsProvider(credentialsProvider);
+    }
+
+    /**
+     * Creates the AWS SDK client.
+     * @param context The process context
+     * @return The created client
+     */
+    @Override
+    public T createClient(final ProcessContext context) {
+        final U clientBuilder = createClientBuilder(context);
+        this.configureClientBuilder(clientBuilder, context);
+        return clientBuilder.build();
+    }
+
+    /**
+     * Creates an AWS service client from the context or returns an existing client from the cache
+     * @param context The process context
+     * @param  awsClientDetails details of the AWS client
+     * @return The created client
+     */
+    protected T getClient(final ProcessContext context, final AwsClientDetails awsClientDetails) {
+        return this.awsClientCache.getOrCreateClient(context, awsClientDetails, this);
+    }
+
+    protected T getClient(final ProcessContext context) {
+        final AwsClientDetails awsClientDetails = new AwsClientDetails(getRegion(context));
+        return getClient(context, awsClientDetails);
+    }
+
+    private SdkHttpClient createSdkHttpClient(final ProcessContext context) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+
+        final int communicationsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        builder.connectionTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.socketTimeout(Duration.ofMillis(communicationsTimeout));
+        builder.maxConnections(context.getMaxConcurrentTasks());
+
+        if (this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
+            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            if (sslContextService != null) {
+                final TrustManager[] trustManagers = new TrustManager[] { sslContextService.createTrustManager() };
+                final KeyManagerFactory keyManagerFactory;
+                try {
+                    keyManagerFactory = KeyStoreUtils.loadKeyManagerFactory(sslContextService.createTlsConfiguration());

Review Comment:
   It looks like AWS has a `FileSToreTlsKeyManagersProvider` that could be used. That should avoid the need for KeyStoreUtils and `nifi-security-utils`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org