You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/08/03 16:31:07 UTC

[nifi] branch main updated: NIFI-7664 Add Content Disposition property to PutS3Object processor

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ddb304a  NIFI-7664 Add Content Disposition property to PutS3Object processor
ddb304a is described below

commit ddb304a927b372c518ae84572315f07b75733a28
Author: Kent Nguyen <nt...@gmail.com>
AuthorDate: Thu Jul 30 21:45:48 2020 +0700

    NIFI-7664 Add Content Disposition property to PutS3Object processor
    
    Add new property 'Content Disposition' to allow user
    to set the content-disposition http header on the S3 object.
    
    Allowed values are 'inline' (default) and 'attachment'.
    If 'attachment' is selected, the filename will be set to the S3 Object key.
    
    Remove default value and keep backward compatibility
    Update fetchS3Object filename attribute settin
    Update constant names
    Update order of if-else condition
    NIFI-7664 Update condition in FetchS3Processor
    NIFI-7664 Undo the unexpected indent
    NIFI-7664 Update international chars unit test
    NIFI-7664 Set fetchS3 file path name
    NIFI-7664 Update code style
    
    This closes #4423.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/processors/aws/s3/FetchS3Object.java      | 22 +++++---
 .../apache/nifi/processors/aws/s3/PutS3Object.java | 30 +++++++++-
 .../nifi/processors/aws/s3/ITPutS3Object.java      | 64 +++++++++++++++++++++-
 .../nifi/processors/aws/s3/TestPutS3Object.java    |  8 ++-
 4 files changed, 111 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 4f68a98..f985e4d 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -165,14 +165,12 @@ public class FetchS3Object extends AbstractS3Processor {
 
             final ObjectMetadata metadata = s3Object.getObjectMetadata();
             if (metadata.getContentDisposition() != null) {
-                final String fullyQualified = metadata.getContentDisposition();
-                final int lastSlash = fullyQualified.lastIndexOf("/");
-                if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) {
-                    attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash));
-                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified);
-                    attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1));
+                final String contentDisposition = metadata.getContentDisposition();
+
+                if (contentDisposition.equals(PutS3Object.CONTENT_DISPOSITION_INLINE) || contentDisposition.startsWith("attachment; filename=")) {
+                    setFilePathAttributes(attributes, key);
                 } else {
-                    attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition());
+                    setFilePathAttributes(attributes, contentDisposition);
                 }
             }
             if (metadata.getContentMD5() != null) {
@@ -231,4 +229,14 @@ public class FetchS3Object extends AbstractS3Processor {
         session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
     }
 
+    protected void setFilePathAttributes(Map<String, String> attributes, String filePathName) {
+        final int lastSlash = filePathName.lastIndexOf("/");
+        if (lastSlash > -1 && lastSlash < filePathName.length() - 1) {
+            attributes.put(CoreAttributes.PATH.key(), filePathName.substring(0, lastSlash));
+            attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), filePathName);
+            attributes.put(CoreAttributes.FILENAME.key(), filePathName.substring(lastSlash + 1));
+        } else {
+            attributes.put(CoreAttributes.FILENAME.key(), filePathName);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index aa2ced6..c36c858 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -117,6 +117,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
     @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
+    @WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
     @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
     @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
@@ -131,6 +132,8 @@ public class PutS3Object extends AbstractS3Processor {
     public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
     public static final String PERSISTENCE_ROOT = "conf/state/";
     public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
+    public static final String CONTENT_DISPOSITION_INLINE = "inline";
+    public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment";
 
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
         .name("Expiration Time Rule")
@@ -153,6 +156,16 @@ public class PutS3Object extends AbstractS3Processor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder()
+            .name("Content Disposition")
+            .displayName("Content Disposition")
+            .description("Sets the Content-Disposition HTTP header indicating if the content is intended to be displayed inline or should be downloaded.\n " +
+                    "Possible values are 'inline' or 'attachment'. If this property is not specified, object's content-disposition will be set to filename. " +
+                    "When 'attachment' is selected, '; filename=' plus object key are automatically appended to form final value 'attachment; filename=\"filename.jpg\"'.")
+            .required(false)
+            .allowableValues(CONTENT_DISPOSITION_INLINE, CONTENT_DISPOSITION_ATTACHMENT)
+            .build();
+
     public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder()
             .name("Cache Control")
             .displayName("Cache Control")
@@ -242,7 +255,7 @@ public class PutS3Object extends AbstractS3Processor {
             .build();
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-        Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
+        Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
             STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
             CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
             MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
@@ -251,6 +264,7 @@ public class PutS3Object extends AbstractS3Processor {
     final static String S3_BUCKET_KEY = "s3.bucket";
     final static String S3_OBJECT_KEY = "s3.key";
     final static String S3_CONTENT_TYPE = "s3.contenttype";
+    final static String S3_CONTENT_DISPOSITION = "s3.contentdisposition";
     final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
     final static String S3_VERSION_ATTR_KEY = "s3.version";
     final static String S3_ETAG_ATTR_KEY = "s3.etag";
@@ -462,7 +476,6 @@ public class PutS3Object extends AbstractS3Processor {
                 public void process(final InputStream rawIn) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn)) {
                         final ObjectMetadata objectMetadata = new ObjectMetadata();
-                        objectMetadata.setContentDisposition(URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8"));
                         objectMetadata.setContentLength(ff.getSize());
 
                         final String contentType = context.getProperty(CONTENT_TYPE)
@@ -479,6 +492,19 @@ public class PutS3Object extends AbstractS3Processor {
                             attributes.put(S3_CACHE_CONTROL, cacheControl);
                         }
 
+                        final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
+                        String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8");
+                        if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
+                            objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
+                            attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
+                        } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
+                             String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
+                             objectMetadata.setContentDisposition(contentDispositionValue);
+                             attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
+                        } else {
+                            objectMetadata.setContentDisposition(fileName);
+                        }
+
                         final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
                                 .evaluateAttributeExpressions(ff).getValue();
                         if (expirationRule != null) {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 8f93c76..fac1e86 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -293,11 +293,73 @@ public class ITPutS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testContentDispositionInline() throws IOException {
+        TestRunner runner = initTestRunner();
+
+        runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE);
+
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        MockFlowFile ff1 = flowFiles.get(0);
+        ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE);
+    }
+
+    @Test
+    public void testContentDispositionNull() throws IOException {
+        // Put
+        TestRunner runner = initTestRunner();
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename",  "filename-on-s3.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+        List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+
+        // Fetch
+        runner = TestRunners.newTestRunner(new FetchS3Object());
+
+        runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(FetchS3Object.REGION, REGION);
+        runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
+
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        MockFlowFile ff = ffs.get(0);
+        ff.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        ff.assertAttributeNotExists(PutS3Object.S3_CONTENT_DISPOSITION);
+    }
+
+    @Test
+    public void testContentDispositionAttachment() throws IOException {
+        TestRunner runner = initTestRunner();
+
+        runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_ATTACHMENT);
+
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        MockFlowFile ff1 = flowFiles.get(0);
+        ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, "attachment; filename=\"hello.txt\"");
+    }
+
+    @Test
     public void testCacheControl() throws IOException {
         TestRunner runner = initTestRunner();
 
         runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache");
-
         runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
 
         runner.run();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index c66b767..3e3abdf 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
+import java.net.URLEncoder;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -64,6 +64,7 @@ public class TestPutS3Object {
     public void setUp() {
         mockS3Client = Mockito.mock(AmazonS3Client.class);
         putS3Object = new PutS3Object() {
+            @Override
             protected AmazonS3Client getClient() {
                 return mockS3Client;
             }
@@ -176,7 +177,7 @@ public class TestPutS3Object {
         PutObjectRequest request = captureRequest.getValue();
 
         ObjectMetadata objectMetadata = request.getMetadata();
-        assertEquals("Iñtërnâtiônàližætiøn.txt", URLDecoder.decode(objectMetadata.getContentDisposition(), "UTF-8"));
+        assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", "UTF-8"), objectMetadata.getContentDisposition());
     }
 
     private void prepareTest() {
@@ -209,7 +210,7 @@ public class TestPutS3Object {
     public void testGetPropertyDescriptors() {
         PutS3Object processor = new PutS3Object();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 37, pd.size());
+        assertEquals("size should be eq", 38, pd.size());
         assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
         assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -242,6 +243,7 @@ public class TestPutS3Object {
         assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
         assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
         assertTrue(pd.contains(PutS3Object.CONTENT_TYPE));
+        assertTrue(pd.contains(PutS3Object.CONTENT_DISPOSITION));
         assertTrue(pd.contains(PutS3Object.CACHE_CONTROL));
         assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD));
         assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));