You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2019/10/04 22:57:30 UTC
[nifi] branch master updated: NIFI-6734: Fixed S3 multipart upload
in case of SSE S3 and CSE* encryptions. Removed unnecessary code from S3
CSE* encryptions. S3 Encryption Service documentation fixes and
improvements. Renamed region property of StandardS3EncryptionService to
kms-region. Renamed Client-side Customer Master Key in
StandardS3EncryptionService. Use Client-side Customer Key on the GUI /
documentation (similar to Server-side Customer Key). Use C suffix in
constants and class names (similar to SSE_C). Fixed / [...]
This is an automated email from the ASF dual-hosted git repository.
alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new ba14169 NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions. Removed unnecessary code from S3 CSE* encryptions. S3 Encryption Service documentation fixes and improvements. Renamed region property of StandardS3EncryptionService to kms-region. Renamed Client-side Customer Master Key in StandardS3EncryptionService. Use Client-side Customer Key on the GUI / documentation (similar to Server-side Customer Key). Use C suffix in constants and class names (similar [...]
ba14169 is described below
commit ba141690c50a019939cb12af17b3bcbecf577b36
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Tue Oct 1 12:08:57 2019 +0200
NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions.
Removed unnecessary code from S3 CSE* encryptions.
S3 Encryption Service documentation fixes and improvements.
Renamed region property of StandardS3EncryptionService to kms-region.
Renamed Client-side Customer Master Key in StandardS3EncryptionService.
Use Client-side Customer Key on the GUI / documentation (similar to
Server-side Customer Key).
Use C suffix in constants and class names (similar to SSE_C).
Fixed / extended StandardS3EncryptionService validation.
FetchS3Object encryption strategy changes.
Disable SSE S3 and SSE KMS for FetchS3Object. In case of fetching the
S3 object, these strategies are handled implicitly / automatically.
Set the encryption strategy on the fetched FF that was used to store
the S3 object, instead of the one that is used to read the object (eg.
non-encrypted or SSE S3 encrypted objects can be fetched with a CSE client).
Typo fix.
This closes #3787.
Signed-off-by: Andy LoPresto <al...@apache.org>
---
.../processors/aws/s3/AbstractS3Processor.java | 5 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 36 ++-
.../apache/nifi/processors/aws/s3/PutS3Object.java | 13 +-
...egy.java => ClientSideCEncryptionStrategy.java} | 60 ++--
.../ClientSideKMSEncryptionStrategy.java | 14 +-
.../aws/s3/encryption/S3EncryptionStrategy.java | 2 +-
...egy.java => ServerSideCEncryptionStrategy.java} | 39 ++-
.../encryption/ServerSideS3EncryptionStrategy.java | 6 +
.../s3/encryption/StandardS3EncryptionService.java | 120 +++++---
.../additionalDetails.html | 21 +-
.../nifi/processors/aws/s3/ITPutS3Object.java | 168 +++++++----
.../aws/s3/encryption/S3EncryptionTestUtil.java} | 30 +-
...ClientSideCEncryptionStrategyKeyValidation.java | 98 +++++++
.../s3/encryption/TestS3EncryptionStrategies.java | 26 +-
...ServerSideCEncryptionStrategyKeyValidation.java | 80 ++++++
.../TestStandardS3EncryptionService.java | 13 +-
.../TestStandardS3EncryptionServiceValidation.java | 307 +++++++++++++++++++++
.../aws/s3/AmazonS3EncryptionService.java | 16 +-
18 files changed, 878 insertions(+), 176 deletions(-)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 5e8ff32..3f01543 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -130,8 +130,9 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
- .description("Specifies the Encryption Service Controller used configure requests. "
- + "For backward compatibility, this value is ignored when 'Server Side Encryption' is set.")
+ .description("Specifies the Encryption Service Controller used to configure requests. " +
+ "PutS3Object: For backward compatibility, this value is ignored when 'Server Side Encryption' is set. " +
+ "FetchS3Object: Only needs to be configured in case of Server-side Customer Key, Client-side KMS and Client-side Customer Key encryptions.")
.required(false)
.identifiesControllerService(AmazonS3EncryptionService.class)
.build();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index aa6233d..4f68a98 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -17,13 +17,16 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.model.SSEAlgorithm;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -35,6 +38,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -67,7 +72,7 @@ import com.amazonaws.services.s3.model.S3Object;
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),
- @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy, if any was set"),})
+ @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy that was used to store the S3 object (if it is encrypted)"),})
public class FetchS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
@@ -101,6 +106,27 @@ public class FetchS3Object extends AbstractS3Processor {
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+ AmazonS3EncryptionService encryptionService = validationContext.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ if (encryptionService != null) {
+ String strategyName = encryptionService.getStrategyName();
+ if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
+ problems.add(new ValidationResult.Builder()
+ .subject(ENCRYPTION_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation(encryptionService.getStrategyDisplayName() + " is not a valid encryption strategy for fetching objects. Decryption will be handled automatically " +
+ "during the fetch of S3 objects encrypted with " + encryptionService.getStrategyDisplayName())
+ .build()
+ );
+ }
+ }
+
+ return problems;
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
@@ -169,7 +195,13 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.putAll(metadata.getUserMetadata());
}
if (metadata.getSSEAlgorithm() != null) {
- attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
+ String sseAlgorithmName = metadata.getSSEAlgorithm();
+ attributes.put("s3.sseAlgorithm", sseAlgorithmName);
+ if (sseAlgorithmName.equals(SSEAlgorithm.AES256.getAlgorithm())) {
+ attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
+ } else if (sseAlgorithmName.equals(SSEAlgorithm.KMS.getAlgorithm())) {
+ attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+ }
}
if (metadata.getVersionId() != null) {
attributes.put("s3.version", metadata.getVersionId());
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 81c55b6..1c2bc01 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -664,6 +664,7 @@ public class PutS3Object extends AbstractS3Processor {
// upload parts
//------------------------------------------------------------
long thisPartSize;
+ boolean isLastPart;
for (int part = currentState.getPartETags().size() + 1;
currentState.getFilePosition() < currentState.getContentLength(); part++) {
if (!PutS3Object.this.isScheduled()) {
@@ -672,13 +673,15 @@ public class PutS3Object extends AbstractS3Processor {
}
thisPartSize = Math.min(currentState.getPartSize(),
(currentState.getContentLength() - currentState.getFilePosition()));
+ isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(currentState.getUploadId())
.withInputStream(in)
.withPartNumber(part)
- .withPartSize(thisPartSize);
+ .withPartSize(thisPartSize)
+ .withLastPart(isLastPart);
if (encryptionService != null) {
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
}
@@ -692,8 +695,14 @@ public class PutS3Object extends AbstractS3Processor {
getLogger().info("Exception saving cache state processing flow file: " +
e.getMessage());
}
+ int available = 0;
+ try {
+ available = in.available();
+ } catch (IOException e) {
+ // in case of the last part, the stream is already closed
+ }
getLogger().info("Success uploading part flowfile={} part={} available={} " +
- "etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
+ "etag={} uploadId={}", new Object[]{ffFilename, part, available,
uploadPartResult.getETag(), currentState.getUploadId()});
} catch (AmazonClientException e) {
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
similarity index 64%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
index d157ea5..45fdc5e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java
@@ -18,11 +18,8 @@ package org.apache.nifi.processors.aws.s3.encryption;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
-import com.amazonaws.services.s3.model.CryptoConfiguration;
import com.amazonaws.services.s3.model.EncryptionMaterials;
import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
import org.apache.commons.codec.binary.Base64;
@@ -38,58 +35,65 @@ import javax.crypto.spec.SecretKeySpec;
* See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-client-side-master-key-intro
*
*/
-public class ClientSideCMKEncryptionStrategy implements S3EncryptionStrategy {
+public class ClientSideCEncryptionStrategy implements S3EncryptionStrategy {
/**
* Create an encryption client.
*
* @param credentialsProvider AWS credentials provider.
* @param clientConfiguration Client configuration
- * @param region AWS region
+ * @param kmsRegion not used by this encryption strategy
* @param keyIdOrMaterial client master key, always base64 encoded
* @return AWS S3 client
*/
@Override
- public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
- if (!validateKey(keyIdOrMaterial).isValid()) {
- throw new SecurityException("Invalid client key; ensure key material is base64 encoded.");
+ public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
+ ValidationResult keyValidationResult = validateKey(keyIdOrMaterial);
+ if (!keyValidationResult.isValid()) {
+ throw new IllegalArgumentException("Invalid client key; " + keyValidationResult.getExplanation());
}
byte[] keyMaterial = Base64.decodeBase64(keyIdOrMaterial);
SecretKeySpec symmetricKey = new SecretKeySpec(keyMaterial, "AES");
StaticEncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(new EncryptionMaterials(symmetricKey));
- boolean haveRegion = StringUtils.isNotBlank(region);
- CryptoConfiguration cryptoConfig = new CryptoConfiguration();
- Region awsRegion = null;
- if (haveRegion) {
- awsRegion = Region.getRegion(Regions.fromName(region));
- cryptoConfig.setAwsKmsRegion(awsRegion);
- }
-
- AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider, cryptoConfig);
- if (haveRegion && awsRegion != null) {
- client.setRegion(awsRegion);
- }
+ AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider);
return client;
}
+ @Override
public ValidationResult validateKey(String keyValue) {
- if (StringUtils.isBlank(keyValue) || !Base64.isBase64(keyValue)) {
- return new ValidationResult.Builder().valid(false).build();
+ if (StringUtils.isBlank(keyValue)) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is empty")
+ .build();
}
- boolean decoded = false;
- boolean sized = false;
byte[] keyMaterial;
try {
+ if (!Base64.isBase64(keyValue)) {
+ throw new Exception();
+ }
keyMaterial = Base64.decodeBase64(keyValue);
- decoded = true;
- sized = keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16;
- } catch (final Exception ignored) {
+ } catch (Exception e) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is not in Base64 encoded form")
+ .build();
+ }
+
+ if (!(keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16)) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is not a Base64 encoded AES-256, AES-192 or AES-128 key")
+ .build();
}
- return new ValidationResult.Builder().valid(decoded && sized).build();
+ return new ValidationResult.Builder().valid(true).build();
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
index e6d75e4..1da5dbb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
@@ -39,26 +39,22 @@ public class ClientSideKMSEncryptionStrategy implements S3EncryptionStrategy {
*
* @param credentialsProvider AWS credentials provider.
* @param clientConfiguration Client configuration
- * @param region AWS region
+ * @param kmsRegion AWS KMS region
* @param keyIdOrMaterial KMS key id
* @return AWS S3 client
*/
@Override
- public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) {
+ public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
KMSEncryptionMaterialsProvider materialProvider = new KMSEncryptionMaterialsProvider(keyIdOrMaterial);
- boolean haveRegion = StringUtils.isNotBlank(region);
- Region awsRegion = null;
+ boolean haveKmsRegion = StringUtils.isNotBlank(kmsRegion);
CryptoConfiguration cryptoConfig = new CryptoConfiguration();
- if (haveRegion) {
- awsRegion = Region.getRegion(Regions.fromName(region));
+ if (haveKmsRegion) {
+ Region awsRegion = Region.getRegion(Regions.fromName(kmsRegion));
cryptoConfig.setAwsKmsRegion(awsRegion);
}
AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, materialProvider, cryptoConfig);
- if (haveRegion) {
- client.setRegion(awsRegion);
- }
return client;
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
index 677fc0e..a0012cb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
@@ -76,7 +76,7 @@ public interface S3EncryptionStrategy {
* @param clientConfiguration Client configuration.
* @return {@link AmazonS3Client}, perhaps an {@link com.amazonaws.services.s3.AmazonS3EncryptionClient}
*/
- default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
+ default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
return null;
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
similarity index 69%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
index 231a5c8..574675c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEncryptionStrategy.java
@@ -22,8 +22,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.ValidationResult;
-import org.bouncycastle.util.encoders.Base64;
/**
* This strategy uses a customer key to perform server-side encryption. Use this strategy when you want the server to perform the encryption,
@@ -32,7 +33,7 @@ import org.bouncycastle.util.encoders.Base64;
* See https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
*
*/
-public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
+public class ServerSideCEncryptionStrategy implements S3EncryptionStrategy {
@Override
public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
SSECustomerKey customerKey = new SSECustomerKey(keyValue);
@@ -59,17 +60,37 @@ public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
@Override
public ValidationResult validateKey(String keyValue) {
- boolean decoded = false;
- boolean sized = false;
+ if (StringUtils.isBlank(keyValue)) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is empty")
+ .build();
+ }
+
byte[] keyMaterial;
try {
- keyMaterial = Base64.decode(keyValue);
- decoded = true;
- sized = (keyMaterial.length > 0) && (keyMaterial.length % 32) == 0;
- } catch (final Exception ignored) {
+ if (!org.apache.commons.codec.binary.Base64.isBase64(keyValue)) {
+ throw new Exception();
+ }
+ keyMaterial = Base64.decodeBase64(keyValue);
+ } catch (Exception e) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is not in Base64 encoded form")
+ .build();
+ }
+
+ if (keyMaterial.length != 32) {
+ return new ValidationResult.Builder()
+ .subject("Key Material")
+ .valid(false)
+ .explanation("it is not a Base64 encoded AES-256 key")
+ .build();
}
- return new ValidationResult.Builder().valid(decoded && sized).build();
+ return new ValidationResult.Builder().valid(true).build();
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
index 0845e12..89f1637 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3.encryption;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -33,4 +34,9 @@ public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
+
+ @Override
+ public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) {
+ objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ }
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
index 21d8d9e..90c50e8 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
@@ -30,17 +30,18 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,39 +53,41 @@ import java.util.List;
import java.util.Map;
-@Tags({"service", "encryption", "encrypt", "decryption", "decrypt", "key"})
+@Tags({"service", "aws", "s3", "encryption", "encrypt", "decryption", "decrypt", "key"})
@CapabilityDescription("Adds configurable encryption to S3 Put and S3 Fetch operations.")
public class StandardS3EncryptionService extends AbstractControllerService implements AmazonS3EncryptionService {
private static final Logger logger = LoggerFactory.getLogger(StandardS3EncryptionService.class);
- public static final String STRATEGY_NAME_NONE = "NONE";
- public static final String STRATEGY_NAME_SSE_S3 = "SSE_S3";
- public static final String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
- public static final String STRATEGY_NAME_SSE_C = "SSE_C";
- public static final String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
- public static final String STRATEGY_NAME_CSE_CMK = "CSE_CMK";
-
- private static final Map<String, S3EncryptionStrategy> namedStrategies = new HashMap<String, S3EncryptionStrategy>() {{
+ private static final Map<String, S3EncryptionStrategy> NAMED_STRATEGIES = new HashMap<String, S3EncryptionStrategy>() {{
put(STRATEGY_NAME_NONE, new NoOpEncryptionStrategy());
put(STRATEGY_NAME_SSE_S3, new ServerSideS3EncryptionStrategy());
put(STRATEGY_NAME_SSE_KMS, new ServerSideKMSEncryptionStrategy());
- put(STRATEGY_NAME_SSE_C, new ServerSideCEKEncryptionStrategy());
+ put(STRATEGY_NAME_SSE_C, new ServerSideCEncryptionStrategy());
put(STRATEGY_NAME_CSE_KMS, new ClientSideKMSEncryptionStrategy());
- put(STRATEGY_NAME_CSE_CMK, new ClientSideCMKEncryptionStrategy());
+ put(STRATEGY_NAME_CSE_C, new ClientSideCEncryptionStrategy());
}};
private static final AllowableValue NONE = new AllowableValue(STRATEGY_NAME_NONE, "None","No encryption.");
private static final AllowableValue SSE_S3 = new AllowableValue(STRATEGY_NAME_SSE_S3, "Server-side S3","Use server-side, S3-managed encryption.");
private static final AllowableValue SSE_KMS = new AllowableValue(STRATEGY_NAME_SSE_KMS, "Server-side KMS","Use server-side, KMS key to perform encryption.");
- private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key for encryption.");
+ private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key to perform encryption.");
private static final AllowableValue CSE_KMS = new AllowableValue(STRATEGY_NAME_CSE_KMS, "Client-side KMS","Use client-side, KMS key to perform encryption.");
- private static final AllowableValue CSE_CMK = new AllowableValue(STRATEGY_NAME_CSE_CMK, "Client-side Customer Master Key","Use client-side, customer-supplied master key to perform encryption.");
+ private static final AllowableValue CSE_C = new AllowableValue(STRATEGY_NAME_CSE_C, "Client-side Customer Key","Use client-side, customer-supplied key to perform encryption.");
+
+ public static final Map<String, AllowableValue> ENCRYPTION_STRATEGY_ALLOWABLE_VALUES = new HashMap<String, AllowableValue>() {{
+ put(STRATEGY_NAME_NONE, NONE);
+ put(STRATEGY_NAME_SSE_S3, SSE_S3);
+ put(STRATEGY_NAME_SSE_KMS, SSE_KMS);
+ put(STRATEGY_NAME_SSE_C, SSE_C);
+ put(STRATEGY_NAME_CSE_KMS, CSE_KMS);
+ put(STRATEGY_NAME_CSE_C, CSE_C);
+ }};
public static final PropertyDescriptor ENCRYPTION_STRATEGY = new PropertyDescriptor.Builder()
.name("encryption-strategy")
.displayName("Encryption Strategy")
.description("Strategy to use for S3 data encryption and decryption.")
- .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_CMK)
+ .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_C)
.required(true)
.defaultValue(NONE.getValue())
.build();
@@ -92,34 +95,38 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
public static final PropertyDescriptor ENCRYPTION_VALUE = new PropertyDescriptor.Builder()
.name("key-id-or-key-material")
.displayName("Key ID or Key Material")
- .description("For Server-side CEK and Client-side CMK, this is base64-encoded Key Material. For all others (except 'None'), it is the KMS Key ID.")
+ .description("For None and Server-side S3: not used. For Server-side KMS and Client-side KMS: the KMS Key ID must be configured. " +
+ "For Server-side Customer Key and Client-side Customer Key: the Key Material must be specified in Base64 encoded form. " +
+ "In case of Server-side Customer Key, the key must be an AES-256 key. In case of Client-side Customer Key, it can be an AES-256, AES-192 or AES-128 key.")
.required(false)
.sensitive(true)
- .addValidator(new StandardValidators.StringLengthValidator(0, 4096))
+ .addValidator((subject, input, context) -> new ValidationResult.Builder().valid(true).build()) // will be validated in customValidate()
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
- .name("region")
+ public static final PropertyDescriptor KMS_REGION = new PropertyDescriptor.Builder()
+ .name("kms-region")
+ .displayName("KMS Region")
+ .description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.")
.required(false)
.allowableValues(AbstractS3Processor.getAvailableRegions())
.defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
private String keyValue = "";
- private String region = "";
+ private String kmsRegion = "";
private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy();
private String strategyName = STRATEGY_NAME_NONE;
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
final String newStrategyName = context.getProperty(ENCRYPTION_STRATEGY).getValue();
- final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).getValue();
- final S3EncryptionStrategy newEncryptionStrategy = namedStrategies.get(newStrategyName);
- String newRegion = null;
+ final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
+ final S3EncryptionStrategy newEncryptionStrategy = NAMED_STRATEGIES.get(newStrategyName);
+ String newKmsRegion = null;
- if (context.getProperty(REGION) != null ) {
- newRegion = context.getProperty(REGION).getValue();
+ if (context.getProperty(KMS_REGION) != null ) {
+ newKmsRegion = context.getProperty(KMS_REGION).getValue();
}
if (newEncryptionStrategy == null) {
@@ -131,13 +138,59 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
strategyName = newStrategyName;
encryptionStrategy = newEncryptionStrategy;
keyValue = newKeyValue;
- region = newRegion;
+ kmsRegion = newKmsRegion;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
Collection<ValidationResult> validationResults = new ArrayList<>();
- validationResults.add(encryptionStrategy.validateKey(validationContext.getProperty(ENCRYPTION_VALUE).getValue()));
+
+ String encryptionStrategyName = validationContext.getProperty(ENCRYPTION_STRATEGY).getValue();
+ String encryptionStrategyDisplayName = ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(encryptionStrategyName).getDisplayName();
+ PropertyValue encryptionValueProperty = validationContext.getProperty(ENCRYPTION_VALUE);
+ String encryptionValue = encryptionValueProperty.evaluateAttributeExpressions().getValue();
+
+ switch (encryptionStrategyName) {
+ case STRATEGY_NAME_NONE:
+ case STRATEGY_NAME_SSE_S3:
+ if (encryptionValueProperty.isSet()) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(ENCRYPTION_VALUE.getDisplayName())
+ .valid(false)
+ .explanation("the property cannot be specified for encryption strategy " + encryptionStrategyDisplayName)
+ .build()
+ );
+ }
+ break;
+ case STRATEGY_NAME_SSE_KMS:
+ case STRATEGY_NAME_CSE_KMS:
+ if (StringUtils.isEmpty(encryptionValue)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(ENCRYPTION_VALUE.getDisplayName())
+ .valid(false)
+ .explanation("a non-empty Key ID must be specified for encryption strategy " + encryptionStrategyDisplayName)
+ .build()
+ );
+ }
+ break;
+ case STRATEGY_NAME_SSE_C:
+ case STRATEGY_NAME_CSE_C:
+ if (StringUtils.isEmpty(encryptionValue)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(ENCRYPTION_VALUE.getDisplayName())
+ .valid(false)
+ .explanation("a non-empty Key Material must be specified for encryption strategy " + encryptionStrategyDisplayName)
+ .build()
+ );
+ } else {
+ S3EncryptionStrategy encryptionStrategy = NAMED_STRATEGIES.get(encryptionStrategyName);
+ String keyIdOrMaterial = validationContext.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
+
+ validationResults.add(encryptionStrategy.validateKey(keyIdOrMaterial));
+ }
+ break;
+ }
+
return validationResults;
}
@@ -146,7 +199,7 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ENCRYPTION_STRATEGY);
properties.add(ENCRYPTION_VALUE);
- properties.add(REGION);
+ properties.add(KMS_REGION);
return Collections.unmodifiableList(properties);
}
@@ -172,18 +225,23 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
@Override
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration) {
- return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, region, keyValue);
+ return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, kmsRegion, keyValue);
}
@Override
- public String getRegion() {
- return region;
+ public String getKmsRegion() {
+ return kmsRegion;
}
@Override
public String getStrategyName() {
return strategyName;
}
+
+ @Override
+ public String getStrategyDisplayName() {
+ return ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(strategyName).getDisplayName();
+ }
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
similarity index 70%
rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
index 108a0b7..b53cffe 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService/additionalDetails.html
@@ -21,15 +21,15 @@
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
-
+<h2>Description</h2>
<div>
- The <code>S3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
+ The <code>StandardS3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
<br>
- <b>Note:</b> this service has no effect when a processor has the <code>SERVER_SIDE_ENCRYPTION</code> property set. To use
+ <b>Note:</b> This service has no effect when a processor has the <code>Server Side Encryption</code> property set. To use
this service with processors so configured, first create a service instance, set the <code>Encryption Strategy</code> to <code>Server-side S3</code>,
- disable the <code>SERVER_SIDE_ENCRYPTION</code> processor setting, and finally, associate the processor with the service.
+ disable the <code>Server Side Encryption</code> processor setting, and finally, associate the processor with the service.
</div>
@@ -44,27 +44,26 @@
<li><code>Server-side S3</code> - encryption and decryption is managed by S3; no keys are required.</li>
<li><code>Server-side KMS</code> - encryption and decryption are performed by S3 using the configured KMS key.</li>
<li><code>Server-side Customer Key</code> - encryption and decryption are performed by S3 using the supplied customer key.</li>
- <li><code>Client-side KMS</code> - like the server-side KMS strategy, with the encryption and decryption performed by the client.</li>
- <li><code>Client-side Customer Master Key</code> - like the server-side CEK strategy, with the encryption and decryption performed by the client.</li>
+ <li><code>Client-side KMS</code> - like the Server-side KMS strategy, with the encryption and decryption performed by the client.</li>
+ <li><code>Client-side Customer Key</code> - like the Server-side Customer Key strategy, with the encryption and decryption performed by the client.</li>
</ul>
</div>
<h3>Key ID or Key Material</h3>
<p>
- When configured for either the server-side or client-side KMS strategy, this field should contain the ID or alias
- of that key.
+ When configured for either the Server-side or Client-side KMS strategies, this field should contain the KMS Key ID.
</p>
<p>
- When configured for either the server-side or client-side customer key strategies, this field should contain the key
+ When configured for either the Server-side or Client-side Customer Key strategies, this field should contain the key
material, and that material must be base64 encoded.
</p>
<p>
All other encryption strategies ignore this field.
</p>
-<h3>Region</h3>
+<h3>KMS Region</h3>
<div>
- KMS key region, if any. This value must match the actual region of the KMS key if supplied.
+ KMS key region, if any. This value must match the actual region of the KMS key if supplied.
</div>
</body>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 7a2cf3c..dd5f990 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -946,12 +946,23 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
- public void testEncryptionServiceWithServerSideS3ManagedEncryptionStrategy() throws IOException, InitializationException {
- TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
+ public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+ byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ testEncryptionServiceWithServerSideS3EncryptionStrategy(smallData);
+ }
+
+ @Test
+ public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+ byte[] largeData = new byte[51 * 1024 * 1024];
+ testEncryptionServiceWithServerSideS3EncryptionStrategy(largeData);
+ }
+
+ private void testEncryptionServiceWithServerSideS3EncryptionStrategy(byte[] data) throws IOException, InitializationException {
+ TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
- runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -960,21 +971,32 @@ public class ITPutS3Object extends AbstractS3IT {
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
- Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+ Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
- MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
- flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
+ flowFile.assertContentEquals(data);
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
- flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+ flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
}
@Test
- public void testEncryptionServiceWithServerSideKMSEncryptionStrategy() throws IOException, InitializationException {
- TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
+ public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+ byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ testEncryptionServiceWithServerSideKMSEncryptionStrategy(smallData);
+ }
+
+ @Test
+ public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+ byte[] largeData = new byte[51 * 1024 * 1024];
+ testEncryptionServiceWithServerSideKMSEncryptionStrategy(largeData);
+ }
+
+ private void testEncryptionServiceWithServerSideKMSEncryptionStrategy(byte[] data) throws IOException, InitializationException {
+ TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
- runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -983,21 +1005,32 @@ public class ITPutS3Object extends AbstractS3IT {
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
- Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+ Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
- MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
- flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
+ flowFile.assertContentEquals(data);
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, "aws:kms");
- flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+ flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
}
@Test
- public void testEncryptionServiceWithServerSideCPEKEncryptionStrategy() throws IOException, InitializationException {
- TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
+ public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+ byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ testEncryptionServiceWithServerSideCEncryptionStrategy(smallData);
+ }
+
+ @Test
+ public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+ byte[] largeData = new byte[51 * 1024 * 1024];
+ testEncryptionServiceWithServerSideCEncryptionStrategy(largeData);
+ }
+
+ private void testEncryptionServiceWithServerSideCEncryptionStrategy(byte[] data) throws IOException, InitializationException {
+ TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
- runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1006,23 +1039,34 @@ public class ITPutS3Object extends AbstractS3IT {
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
- Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+ Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
- MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
- flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
+ flowFile.assertContentEquals(data);
// successful fetch does not indicate type of original encryption:
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, null);
// but it does indicate it via our specific attribute:
- flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+ flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
+ }
+
+ @Test
+ public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
+ byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ testEncryptionServiceWithClientSideKMSEncryptionStrategy(smallData);
}
@Test
- public void testEncryptionServiceWithClientSideKMSEncryptionStrategy() throws InitializationException, IOException {
- TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
+ public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+ byte[] largeData = new byte[51 * 1024 * 1024];
+ testEncryptionServiceWithClientSideKMSEncryptionStrategy(largeData);
+ }
+
+ private void testEncryptionServiceWithClientSideKMSEncryptionStrategy(byte[] data) throws InitializationException, IOException {
+ TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
- runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1031,21 +1075,32 @@ public class ITPutS3Object extends AbstractS3IT {
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
- Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+ Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
- MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
- flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
+ flowFile.assertContentEquals(data);
flowFile.assertAttributeEquals("x-amz-wrap-alg", "kms");
- flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+ flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
}
@Test
- public void testEncryptionServiceWithClientSideCMKEncryptionStrategy() throws InitializationException, IOException {
- TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
+ public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingSingleUpload() throws InitializationException, IOException {
+ byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ testEncryptionServiceWithClientSideCEncryptionStrategy(smallData);
+ }
+
+ @Test
+ public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
+ byte[] largeData = new byte[51 * 1024 * 1024];
+ testEncryptionServiceWithClientSideCEncryptionStrategy(largeData);
+ }
+
+ private void testEncryptionServiceWithClientSideCEncryptionStrategy(byte[] data) throws InitializationException, IOException {
+ TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
- runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
@@ -1054,11 +1109,11 @@ public class ITPutS3Object extends AbstractS3IT {
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
- Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
+ Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
- MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
- flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
- flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
+ flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
+ flowFile.assertContentEquals(data);
flowFile.assertAttributeExists("x-amz-key");
flowFile.assertAttributeNotEquals("x-amz-key", "");
@@ -1068,11 +1123,24 @@ public class ITPutS3Object extends AbstractS3IT {
}
private static TestRunner createPutEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
- return createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
+ TestRunner runner = createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
+
+ runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
+ runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
+
+ return runner;
+ }
+
+ private static TestRunner createFetchEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
+ if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
+ strategyName = null;
+ }
+
+ return createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
}
private static MockFlowFile fetchEncryptedFlowFile(Map<String, String> attributes, String strategyName, String keyIdOrMaterial) throws InitializationException {
- final TestRunner runner = createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
+ final TestRunner runner = createFetchEncryptionTestRunner(strategyName, keyIdOrMaterial);
runner.enqueue(new byte[0], attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
@@ -1082,24 +1150,28 @@ public class ITPutS3Object extends AbstractS3IT {
private static TestRunner createEncryptionTestRunner(Processor processor, String strategyName, String keyIdOrMaterial) throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(processor);
- final StandardS3EncryptionService service = new StandardS3EncryptionService();
final ConfigurationContext context = mock(ConfigurationContext.class);
- runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
- runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
- runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
- runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
- runner.setProperty(service, StandardS3EncryptionService.REGION, REGION);
- when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
- when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
- when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(REGION));
+ if (strategyName != null) {
+ final StandardS3EncryptionService service = new StandardS3EncryptionService();
+ runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
+ runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
+
+ runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
+ runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
+ runner.setProperty(service, StandardS3EncryptionService.KMS_REGION, REGION);
- service.onConfigured(context);
- runner.enableControllerService(service);
+ when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
+ when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
+ when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(REGION));
+
+ service.onConfigured(context);
+ runner.enableControllerService(service);
+ }
return runner;
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
similarity index 53%
copy from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
copy to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
index 0845e12..26e2c80 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionTestUtil.java
@@ -16,21 +16,23 @@
*/
package org.apache.nifi.processors.aws.s3.encryption;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
+import java.util.Base64;
+import java.util.Random;
+final class S3EncryptionTestUtil {
-/**
- * This strategy uses S3-managed keys to perform server-side encryption. Use this strategy when you want the server to
- * perform the encryption (meaning you pay the cost of processing) and you want AWS to completely manage the key.
- *
- *
- * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
- *
- */
-public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
- @Override
- public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
- objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ private static Random RANDOM = new Random();
+
+ private S3EncryptionTestUtil() {
+ }
+
+ static String createKey(int keySize) {
+ if (keySize % 8 != 0) {
+ throw new IllegalArgumentException("Invalid test data");
+ }
+
+ byte[] keyMaterial = new byte[keySize / 8];
+ RANDOM.nextBytes(keyMaterial);
+ return new String(Base64.getEncoder().encode(keyMaterial));
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java
new file mode 100644
index 0000000..5d716f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestClientSideCEncryptionStrategyKeyValidation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestClientSideCEncryptionStrategyKeyValidation {
+
+ private ClientSideCEncryptionStrategy strategy;
+
+ @Before
+ public void setUp() {
+ strategy = new ClientSideCEncryptionStrategy();
+ }
+
+ @Test
+ public void testValid256BitKey() {
+ String key = createKey(256);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertTrue(result.isValid());
+ }
+
+ @Test
+ public void testValid192BitKey() {
+ String key = createKey(192);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertTrue(result.isValid());
+ }
+
+ @Test
+ public void testValid128BitKey() {
+ String key = createKey(128);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertTrue(result.isValid());
+ }
+
+ @Test
+ public void testNotSupportedKeySize() {
+ String key = createKey(512);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testNullKey() {
+ String key = null;
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testEmptyKey() {
+ String key = "";
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testNotBase64EncodedKey() {
+ String key = "NotBase64EncodedKey";
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
index 3261288..ec12230 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
@@ -33,7 +33,7 @@ public class TestS3EncryptionStrategies {
private String randomKeyMaterial = "";
private String randomKeyId = "mock-key-id";
- private String region = "us-west-1";
+ private String kmsRegion = "us-west-1";
private ObjectMetadata metadata = null;
private PutObjectRequest putObjectRequest = null;
@@ -60,7 +60,7 @@ public class TestS3EncryptionStrategies {
S3EncryptionStrategy strategy = new ClientSideKMSEncryptionStrategy();
// This shows that the strategy builds a client:
- Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
+ Assert.assertNotNull(strategy.createEncryptionClient(null, null, kmsRegion, randomKeyMaterial));
// This shows that the strategy does not modify the metadata or any of the requests:
Assert.assertNull(metadata.getSSEAlgorithm());
@@ -76,11 +76,11 @@ public class TestS3EncryptionStrategies {
}
@Test
- public void testClientSideCMKEncryptionStrategy() {
- S3EncryptionStrategy strategy = new ClientSideCMKEncryptionStrategy();
+ public void testClientSideCEncryptionStrategy() {
+ S3EncryptionStrategy strategy = new ClientSideCEncryptionStrategy();
// This shows that the strategy builds a client:
- Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
+ Assert.assertNotNull(strategy.createEncryptionClient(null, null, null, randomKeyMaterial));
// This shows that the strategy does not modify the metadata or any of the requests:
Assert.assertNull(metadata.getSSEAlgorithm());
@@ -96,11 +96,11 @@ public class TestS3EncryptionStrategies {
}
@Test
- public void testServerSideCEKEncryptionStrategy() {
- S3EncryptionStrategy strategy = new ServerSideCEKEncryptionStrategy();
+ public void testServerSideCEncryptionStrategy() {
+ S3EncryptionStrategy strategy = new ServerSideCEncryptionStrategy();
// This shows that the strategy does *not* build a client:
- Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+ Assert.assertNull(strategy.createEncryptionClient(null, null, null, ""));
// This shows that the strategy sets the SSE customer key as expected:
strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyMaterial);
@@ -130,7 +130,7 @@ public class TestS3EncryptionStrategies {
S3EncryptionStrategy strategy = new ServerSideKMSEncryptionStrategy();
// This shows that the strategy does *not* build a client:
- Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+ Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
// This shows that the strategy sets the SSE KMS key id as expected:
strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyId);
@@ -150,10 +150,14 @@ public class TestS3EncryptionStrategies {
S3EncryptionStrategy strategy = new ServerSideS3EncryptionStrategy();
// This shows that the strategy does *not* build a client:
- Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+ Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
// This shows that the strategy sets the SSE algorithm field as expected:
- strategy.configurePutObjectRequest(putObjectRequest, metadata, "");
+ strategy.configurePutObjectRequest(putObjectRequest, metadata, null);
+ Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
+
+ // Same for InitiateMultipartUploadRequest:
+ strategy.configureInitiateMultipartUploadRequest(initUploadRequest, metadata, null);
Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java
new file mode 100644
index 0000000..77d64ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestServerSideCEncryptionStrategyKeyValidation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestServerSideCEncryptionStrategyKeyValidation {
+
+ private ServerSideCEncryptionStrategy strategy;
+
+ @Before
+ public void setUp() {
+ strategy = new ServerSideCEncryptionStrategy();
+ }
+
+ @Test
+ public void testValid256BitKey() {
+ String key = createKey(256);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertTrue(result.isValid());
+ }
+
+ @Test
+ public void testNotSupportedKeySize() {
+ String key = createKey(512);
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testNullKey() {
+ String key = null;
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testEmptyKey() {
+ String key = "";
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+
+ @Test
+ public void testNotBase64EncodedKey() {
+ String key = "NotBase64EncodedKey";
+
+ ValidationResult result = strategy.validateKey(key);
+
+ assertFalse(result.isValid());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
index d040d34..1a6c02e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
@@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
@@ -37,26 +38,26 @@ public class TestStandardS3EncryptionService {
private ConfigurationContext context;
private String strategyName;
private String keyIdOrMaterial;
- private String region;
+ private String kmsRegion;
@Before
public void setup() throws InitializationException {
service = new StandardS3EncryptionService();
context = Mockito.mock(ConfigurationContext.class);
- strategyName = StandardS3EncryptionService.STRATEGY_NAME_NONE;
+ strategyName = AmazonS3EncryptionService.STRATEGY_NAME_NONE;
keyIdOrMaterial = "test-key-id";
- region = "us-west-1";
+ kmsRegion = "us-west-1";
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
- Mockito.when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(region));
+ Mockito.when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(kmsRegion));
service.onConfigured(context);
}
@Test
public void testServiceProperties() {
- Assert.assertEquals(service.getRegion(), region);
+ Assert.assertEquals(service.getKmsRegion(), kmsRegion);
Assert.assertEquals(service.getStrategyName(), strategyName);
}
@@ -97,6 +98,6 @@ public class TestStandardS3EncryptionService {
Assert.assertEquals(properties.get(0).getName(), StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName());
Assert.assertEquals(properties.get(1).getName(), StandardS3EncryptionService.ENCRYPTION_VALUE.getName());
- Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.REGION.getName());
+ Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.KMS_REGION.getName());
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java
new file mode 100644
index 0000000..2bccc17
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionServiceValidation.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_C;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_NONE;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_C;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS;
+import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3;
+import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
+
+public class TestStandardS3EncryptionServiceValidation {
+
+ private TestRunner runner;
+ private StandardS3EncryptionService service;
+
+ @Before
+ public void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ service = new StandardS3EncryptionService();
+ runner.addControllerService("s3-encryption-service", service);
+ }
+
+
+ // NoOpEncryptionStrategy
+
+ @Test
+ public void testValidNoOpEncryptionStrategy() {
+ configureService(STRATEGY_NAME_NONE, null);
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
+ configureService(STRATEGY_NAME_NONE, "key-id");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_NONE, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
+ configureService(STRATEGY_NAME_NONE, "${key-id-var}");
+
+ runner.assertNotValid(service);
+ }
+
+
+ // ServerSideS3EncryptionStrategy
+
+ @Test
+ public void testValidServerSideS3EncryptionStrategy() {
+ configureService(STRATEGY_NAME_SSE_S3, null);
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
+ configureService(STRATEGY_NAME_SSE_S3, "key-id");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_SSE_S3, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
+ configureService(STRATEGY_NAME_SSE_S3, "${key-id-var}");
+
+ runner.assertNotValid(service);
+ }
+
+
+ // ServerSideKMSEncryptionStrategy
+
+ @Test
+ public void testValidServerSideKMSEncryptionStrategy() {
+ configureService(STRATEGY_NAME_SSE_KMS, "key-id");
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testValidServerSideKMSEncryptionStrategyUsingEL() {
+ configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+ configureVariable("key-id-var", "key-id");
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
+ configureService(STRATEGY_NAME_SSE_KMS, null);
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_SSE_KMS, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
+ configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
+ configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
+ configureVariable("key-id-var", "");
+
+ runner.assertNotValid(service);
+ }
+
+
+ // ServerSideCEncryptionStrategy
+
+ @Test
+ public void testValidServerSideCEncryptionStrategy() {
+ configureService(STRATEGY_NAME_SSE_C, createKey(256));
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testValidServerSideCEncryptionStrategyUsingEL() {
+ configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+ configureVariable("key-material-var", createKey(256));
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
+ configureService(STRATEGY_NAME_SSE_C, null);
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_SSE_C, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
+ configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
+ configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
+ configureVariable("key-material-var", "");
+
+ runner.assertNotValid(service);
+ }
+
+
+ // ClientSideKMSEncryptionStrategy
+
+ @Test
+ public void testValidClientSideKMSEncryptionStrategy() {
+ configureService(STRATEGY_NAME_CSE_KMS, "key-id");
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testValidClientSideKMSEncryptionStrategyUsingEL() {
+ configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+ configureVariable("key-id-var", "key-id");
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
+ configureService(STRATEGY_NAME_CSE_KMS, null);
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_CSE_KMS, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
+ configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
+ configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
+ configureVariable("key-id-var", "");
+
+ runner.assertNotValid(service);
+ }
+
+
+ // ClientSideCEncryptionStrategy
+
+ @Test
+ public void testValidClientSideCEncryptionStrategy() {
+ configureService(STRATEGY_NAME_CSE_C, createKey(256));
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testValidClientSideCEncryptionStrategyUsingEL() {
+ configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+ configureVariable("key-material-var", createKey(256));
+
+ runner.assertValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
+ configureService(STRATEGY_NAME_CSE_C, null);
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
+ configureService(STRATEGY_NAME_CSE_C, "");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
+ configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+
+ runner.assertNotValid(service);
+ }
+
+ @Test
+ public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
+ configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
+ configureVariable("key-material-var", "");
+
+ runner.assertNotValid(service);
+ }
+
+
+ private void configureService(String encryptionStrategy, String keyIdOrMaterial) {
+ runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, encryptionStrategy);
+ if (keyIdOrMaterial != null) {
+ runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
+ }
+ }
+
+ private void configureVariable(String name, String value) {
+ runner.setVariable(name, value);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
index 946a1cc..81d52e6 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-service-api/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
@@ -31,6 +31,13 @@ import org.apache.nifi.controller.ControllerService;
*/
public interface AmazonS3EncryptionService extends ControllerService {
+ String STRATEGY_NAME_NONE = "NONE";
+ String STRATEGY_NAME_SSE_S3 = "SSE_S3";
+ String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
+ String STRATEGY_NAME_SSE_C = "SSE_C";
+ String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
+ String STRATEGY_NAME_CSE_C = "CSE_C";
+
/**
* Configure a {@link PutObjectRequest} for encryption.
* @param request the request to configure.
@@ -69,12 +76,17 @@ public interface AmazonS3EncryptionService extends ControllerService {
AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration);
/**
- * @return The region associated with the service, as a String.
+ * @return The KMS region associated with the service, as a String.
*/
- String getRegion();
+ String getKmsRegion();
/**
* @return The name of the encryption strategy associated with the service.
*/
String getStrategyName();
+
+ /**
+ * @return The display name of the encryption strategy associated with the service.
+ */
+ String getStrategyDisplayName();
}