You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/07 20:19:39 UTC

nifi git commit: NIFI-1180: Modify PutS3Object to enable encryption

Repository: nifi
Updated Branches:
  refs/heads/master 9235a28f8 -> 2bcc31330


NIFI-1180: Modify PutS3Object to enable encryption

This closes #246.

Signed-off-by: Andy LoPresto <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2bcc3133
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2bcc3133
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2bcc3133

Branch: refs/heads/master
Commit: 2bcc31330cafbba4f342e9ea464399fd6c7b575b
Parents: 9235a28
Author: Adam Lamar <ad...@gmail.com>
Authored: Sat Feb 20 23:12:56 2016 -0700
Committer: Andy LoPresto <al...@apache.org>
Committed: Thu Apr 7 11:19:22 2016 -0700

----------------------------------------------------------------------
 .../nifi/processors/aws/s3/FetchS3Object.java   |  4 +
 .../nifi/processors/aws/s3/PutS3Object.java     | 21 ++++-
 .../PutS3ObjectTest.groovy                      | 95 ++++++++++++++++++++
 .../nifi/processors/aws/s3/AbstractS3IT.java    |  9 ++
 .../nifi/processors/aws/s3/ITFetchS3Object.java | 22 +++++
 .../nifi/processors/aws/s3/ITPutS3Object.java   | 29 +++++-
 6 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 7246bd3..4591a39 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -61,6 +61,7 @@ import com.amazonaws.services.s3.model.S3Object;
     @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
     @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
     @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
+    @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),})
 public class FetchS3Object extends AbstractS3Processor {
 
@@ -137,6 +138,9 @@ public class FetchS3Object extends AbstractS3Processor {
             if (metadata.getUserMetadata() != null) {
                 attributes.putAll(metadata.getUserMetadata());
             }
+            if (metadata.getSSEAlgorithm() != null) {
+                attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
+            }
             if (metadata.getVersionId() != null) {
                 attributes.put("s3.version", metadata.getVersionId());
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 87f00b6..50d2453 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -113,6 +113,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
     @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
     @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
             "the S3 object, if one is set"),
+    @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
     @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
             "the S3 object, if any was set")
 })
@@ -121,6 +122,7 @@ public class PutS3Object extends AbstractS3Processor {
     public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
     public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
     public static final String PERSISTENCE_ROOT = "conf/state/";
+    public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
 
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
         .name("Expiration Time Rule")
@@ -177,10 +179,20 @@ public class PutS3Object extends AbstractS3Processor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION = new PropertyDescriptor.Builder()
+            .name("server-side-encryption")
+            .displayName("Server Side Encryption")
+            .description("Specifies the algorithm used for server side encryption.")
+            .required(true)
+            .allowableValues(NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION)
+            .defaultValue(NO_SERVER_SIDE_ENCRYPTION)
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
         Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
             FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
-            ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, PROXY_HOST, PROXY_HOST_PORT));
+            ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
+            PROXY_HOST, PROXY_HOST_PORT));
 
     final static String S3_BUCKET_KEY = "s3.bucket";
     final static String S3_OBJECT_KEY = "s3.key";
@@ -194,6 +206,7 @@ public class PutS3Object extends AbstractS3Processor {
     final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod";
     final static String S3_API_METHOD_PUTOBJECT = "putobject";
     final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload";
+    final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm";
 
     final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
 
@@ -407,6 +420,12 @@ public class PutS3Object extends AbstractS3Processor {
                             }
                         }
 
+                        final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+                        if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
+                            objectMetadata.setSSEAlgorithm(serverSideEncryption);
+                            attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
+                        }
+
                         if (!userMetadata.isEmpty()) {
                             objectMetadata.setUserMetadata(userMetadata);
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
new file mode 100644
index 0000000..05ef062
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3
+
+import com.amazonaws.services.s3.model.ObjectMetadata
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(JUnit4.class)
+class PutS3ObjectTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(PutS3ObjectTest.class);
+
+    private static long mockFlowFileId = 0
+    private PutS3Object putS3Object
+
+    @BeforeClass
+    static void setUpOnce() {
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() {
+        super.setUp()
+
+        putS3Object = new PutS3Object()
+
+    }
+
+    @After
+    void tearDown() {
+
+    }
+
+    @Test
+    void testShouldIncludeServerSideEncryptionAlgorithmProperty() {
+        // Arrange
+
+        // Act
+        def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+        def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+        // Assert
+        assert ssePropertyDescriptor
+        assert ssePropertyDescriptor.name == "server-side-encryption"
+        assert ssePropertyDescriptor.displayName == "Server Side Encryption"
+    }
+
+    @Test
+    void testShouldValidateServerSideEncryptionDefaultsToNone() {
+        // Arrange
+
+        // Act
+        def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+        def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+        // Assert
+        assert ssePropertyDescriptor
+        assert ssePropertyDescriptor.defaultValue == putS3Object.NO_SERVER_SIDE_ENCRYPTION
+    }
+
+    @Test
+    void testShouldValidateServerSideEncryptionAllowableValues() {
+        // Arrange
+
+        // Act
+        def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
+        def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
+
+        // Assert
+        assert ssePropertyDescriptor
+        assert ssePropertyDescriptor.allowableValues*.toString() == [putS3Object.NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION]
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 11fdffe..697e2f2 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CreateBucketRequest;
 import com.amazonaws.services.s3.model.DeleteBucketRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.nifi.util.file.FileUtils;
@@ -132,6 +133,14 @@ public abstract class AbstractS3IT {
         client.putObject(putRequest);
     }
 
+    protected void putTestFileEncrypted(String key, File file) throws AmazonS3Exception, FileNotFoundException {
+        ObjectMetadata objectMetadata = new ObjectMetadata();
+        objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+        PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata);
+
+        client.putObject(putRequest);
+    }
+
     protected Path getResourcePath(String resourceName) {
         Path path = null;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
index 8e5eb28..db115bb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processors.aws.AbstractAWSProcessor;
 import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
@@ -57,6 +58,27 @@ public class ITFetchS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testSimpleGetEncrypted() throws IOException {
+        putTestFileEncrypted("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
+
+        runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(FetchS3Object.REGION, REGION);
+        runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test-file");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        ffs.get(0).assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+    }
+
+    @Test
     public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable {
         putTestFile("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2bcc3133/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index bc5e7a2..5d7797e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -32,6 +32,7 @@ import java.util.regex.Pattern;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -94,6 +95,31 @@ public class ITPutS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testSimplePutEncrypted() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+
+        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+
+        Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
+        for (int i = 0; i < 3; i++) {
+            final Map<String, String> attrs = new HashMap<>();
+            attrs.put("filename", String.valueOf(i) + ".txt");
+            runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        }
+        runner.run(3);
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        for (MockFlowFile flowFile : ffs) {
+            flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+        }
+    }
+
+    @Test
     public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable {
         final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
 
@@ -227,7 +253,7 @@ public class ITPutS3Object extends AbstractS3IT {
     public void testGetPropertyDescriptors() throws Exception {
         PutS3Object processor = new PutS3Object();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 24, pd.size());
+        assertEquals("size should be eq", 25, pd.size());
         assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
         assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -246,6 +272,7 @@ public class ITPutS3Object extends AbstractS3IT {
         assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
         assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
         assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
+        assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
     }
 
     @Test