You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/07 20:19:39 UTC
nifi git commit: NIFI-1180: Modify PutS3Object to enable encryption
Repository: nifi
Updated Branches:
refs/heads/master 9235a28f8 -> 2bcc31330
NIFI-1180: Modify PutS3Object to enable encryption
This closes #246.
Signed-off-by: Andy LoPresto <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2bcc3133
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2bcc3133
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2bcc3133
Branch: refs/heads/master
Commit: 2bcc31330cafbba4f342e9ea464399fd6c7b575b
Parents: 9235a28
Author: Adam Lamar <ad...@gmail.com>
Authored: Sat Feb 20 23:12:56 2016 -0700
Committer: Andy LoPresto <al...@apache.org>
Committed: Thu Apr 7 11:19:22 2016 -0700
----------------------------------------------------------------------
.../nifi/processors/aws/s3/FetchS3Object.java | 4 +
.../nifi/processors/aws/s3/PutS3Object.java | 21 ++++-
.../PutS3ObjectTest.groovy | 95 ++++++++++++++++++++
.../nifi/processors/aws/s3/AbstractS3IT.java | 9 ++
.../nifi/processors/aws/s3/ITFetchS3Object.java | 22 +++++
.../nifi/processors/aws/s3/ITPutS3Object.java | 29 +++++-
6 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 7246bd3..4591a39 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -61,6 +61,7 @@ import com.amazonaws.services.s3.model.S3Object;
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
+ @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),})
public class FetchS3Object extends AbstractS3Processor {
@@ -137,6 +138,9 @@ public class FetchS3Object extends AbstractS3Processor {
if (metadata.getUserMetadata() != null) {
attributes.putAll(metadata.getUserMetadata());
}
+ if (metadata.getSSEAlgorithm() != null) {
+ attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
+ }
if (metadata.getVersionId() != null) {
attributes.put("s3.version", metadata.getVersionId());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 87f00b6..50d2453 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -113,6 +113,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
"the S3 object, if one is set"),
+ @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
"the S3 object, if any was set")
})
@@ -121,6 +122,7 @@ public class PutS3Object extends AbstractS3Processor {
public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
public static final String PERSISTENCE_ROOT = "conf/state/";
+ public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
@@ -177,10 +179,20 @@ public class PutS3Object extends AbstractS3Processor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
+ public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION = new PropertyDescriptor.Builder()
+ .name("server-side-encryption")
+ .displayName("Server Side Encryption")
+ .description("Specifies the algorithm used for server side encryption.")
+ .required(true)
+ .allowableValues(NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION)
+ .defaultValue(NO_SERVER_SIDE_ENCRYPTION)
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
- ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, PROXY_HOST, PROXY_HOST_PORT));
+ ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
+ PROXY_HOST, PROXY_HOST_PORT));
final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key";
@@ -194,6 +206,7 @@ public class PutS3Object extends AbstractS3Processor {
final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod";
final static String S3_API_METHOD_PUTOBJECT = "putobject";
final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload";
+ final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm";
final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
@@ -407,6 +420,12 @@ public class PutS3Object extends AbstractS3Processor {
}
}
+ final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+ if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
+ objectMetadata.setSSEAlgorithm(serverSideEncryption);
+ attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
+ }
+
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
new file mode 100644
index 0000000..05ef062
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3
+
+import com.amazonaws.services.s3.model.ObjectMetadata
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(JUnit4.class)
+class PutS3ObjectTest extends GroovyTestCase {
+ private static final Logger logger = LoggerFactory.getLogger(PutS3ObjectTest.class);
+
+ private static long mockFlowFileId = 0
+ private PutS3Object putS3Object
+
+ @BeforeClass
+ static void setUpOnce() {
+ logger.metaClass.methodMissing = { String name, args ->
+ logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+ }
+ }
+
+ @Before
+ void setUp() {
+ super.setUp()
+
+ putS3Object = new PutS3Object()
+
+ }
+
+ @After
+ void tearDown() {
+
+ }
+
+ @Test
+ void testShouldIncludeServerSideEncryptionAlgorithmProperty() {
+ // Arrange
+
+ // Act
+ def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+ def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+ // Assert
+ assert ssePropertyDescriptor
+ assert ssePropertyDescriptor.name == "server-side-encryption"
+ assert ssePropertyDescriptor.displayName == "Server Side Encryption"
+ }
+
+ @Test
+ void testShouldValidateServerSideEncryptionDefaultsToNone() {
+ // Arrange
+
+ // Act
+ def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+ def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+ // Assert
+ assert ssePropertyDescriptor
+ assert ssePropertyDescriptor.defaultValue == putS3Object.NO_SERVER_SIDE_ENCRYPTION
+ }
+
+ @Test
+ void testShouldValidateServerSideEncryptionAllowableValues() {
+ // Arrange
+
+ // Act
+ def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+ def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+ // Assert
+ assert ssePropertyDescriptor
+ assert ssePropertyDescriptor.allowableValues*.toString() == [putS3Object.NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION]
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 11fdffe..697e2f2 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.nifi.util.file.FileUtils;
@@ -132,6 +133,14 @@ public abstract class AbstractS3IT {
client.putObject(putRequest);
}
+ protected void putTestFileEncrypted(String key, File file) throws AmazonS3Exception, FileNotFoundException {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata);
+
+ client.putObject(putRequest);
+ }
+
protected Path getResourcePath(String resourceName) {
Path path = null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
index 8e5eb28..db115bb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
@@ -57,6 +58,27 @@ public class ITFetchS3Object extends AbstractS3IT {
}
@Test
+ public void testSimpleGetEncrypted() throws IOException {
+ putTestFileEncrypted("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+ final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
+
+ runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(FetchS3Object.REGION, REGION);
+ runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "test-file");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+ final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+ ffs.get(0).assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ }
+
+ @Test
public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable {
putTestFile("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index bc5e7a2..5d7797e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -32,6 +32,7 @@ import java.util.regex.Pattern;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -94,6 +95,31 @@ public class ITPutS3Object extends AbstractS3IT {
}
@Test
+ public void testSimplePutEncrypted() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+
+ runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(PutS3Object.REGION, REGION);
+ runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+
+ Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
+ for (int i = 0; i < 3; i++) {
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", String.valueOf(i) + ".txt");
+ runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ }
+ runner.run(3);
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
+ final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+ for (MockFlowFile flowFile : ffs) {
+ flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+ }
+ }
+
+ @Test
public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
@@ -227,7 +253,7 @@ public class ITPutS3Object extends AbstractS3IT {
public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 24, pd.size());
+ assertEquals("size should be eq", 25, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -246,6 +272,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
+ assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
}
@Test