You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/15 11:49:30 UTC

nifi git commit: NIFI-5221 - Added 'Object Tagging' functionalities to S3 Processors

Repository: nifi
Updated Branches:
  refs/heads/master be63378a1 -> 375239894


NIFI-5221 - Added 'Object Tagging' functionalities to S3 Processors

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2751.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37523989
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37523989
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37523989

Branch: refs/heads/master
Commit: 37523989430e51da95c921e4cda7865d34721352
Parents: be63378
Author: zenfenan <si...@gmail.com>
Authored: Fri Jun 1 17:05:09 2018 +0530
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 15 13:49:13 2018 +0200

----------------------------------------------------------------------
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |  7 --
 .../apache/nifi/processors/aws/s3/ListS3.java   | 40 +++++++++--
 .../nifi/processors/aws/s3/PutS3Object.java     | 62 +++++++++++++++--
 .../nifi/processors/aws/s3/AbstractS3IT.java    | 12 +++-
 .../apache/nifi/processors/aws/s3/ITListS3.java | 33 ++++++++-
 .../nifi/processors/aws/s3/ITPutS3Object.java   | 34 +++++++++-
 .../nifi/processors/aws/s3/TestListS3.java      | 31 ++++++++-
 .../nifi/processors/aws/s3/TestPutS3Object.java | 71 ++++++++++++++------
 8 files changed, 251 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index a1ba775..1901846 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -78,13 +78,6 @@
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-sts</artifactId>
         </dependency>
-        <!-- Test Dependencies for testing interaction with AWS -->
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>2.6.6</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 3e037f8..f5a69ac 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -26,6 +26,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -80,7 +83,9 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
         @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
         @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
         @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
-        @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")})
+        @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable"),
+        @WritesAttribute(attribute = "s3.tag.___", description = "If 'Write Object Tags' is set to 'True', the tags associated to the S3 object that is being listed " +
+                "will be written as part of the flowfile attributes")})
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor {
 
@@ -136,11 +141,20 @@ public class ListS3 extends AbstractS3Processor {
             .defaultValue("0 sec")
             .build();
 
+    public static final PropertyDescriptor WRITE_OBJECT_TAGS = new PropertyDescriptor.Builder()
+            .name("write-s3-object-tags")
+            .displayName("Write Object Tags")
+            .description("If set to 'True', the tags associated with the S3 object will be written as FlowFile attributes")
+            .required(true)
+            .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
+            .defaultValue("false")
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
+            Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
                     AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
-                    SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
-                    DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
+                    SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
+                    PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
 
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@@ -263,6 +277,10 @@ public class ListS3 extends AbstractS3Processor {
                     attributes.put("s3.version", versionSummary.getVersionId());
                 }
 
+                if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+                    attributes.putAll(writeObjectTags(client, versionSummary));
+                }
+
                 // Create the flowfile
                 FlowFile flowFile = session.create();
                 flowFile = session.putAllAttributes(flowFile, attributes);
@@ -307,6 +325,20 @@ public class ListS3 extends AbstractS3Processor {
         return willCommit;
     }
 
+    private Map<String, String> writeObjectTags(AmazonS3 client, S3VersionSummary versionSummary) {
+        final GetObjectTaggingResult taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+        final Map<String, String> tagMap = new HashMap<>();
+
+        if (taggingResult != null) {
+            final List<Tag> tags = taggingResult.getTagSet();
+
+            for (final Tag tag : tags) {
+                tagMap.put("s3.tag." + tag.getKey(), tag.getValue());
+            }
+        }
+        return tagMap;
+    }
+
     private interface S3BucketLister {
         public void setBucketName(String bucketName);
         public void setPrefix(String prefix);

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index f856b85..49a649e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -39,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.amazonaws.services.s3.model.ObjectTagging;
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -49,6 +51,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -205,11 +208,32 @@ public class PutS3Object extends AbstractS3Processor {
             .defaultValue(NO_SERVER_SIDE_ENCRYPTION)
             .build();
 
+    public static final PropertyDescriptor OBJECT_TAGS_PREFIX = new PropertyDescriptor.Builder()
+            .name("s3-object-tags-prefix")
+            .displayName("Object Tags Prefix")
+            .description("Specifies the prefix which would be scanned against the incoming FlowFile's attributes and the matching attribute's " +
+                    "name and value would be considered as the outgoing S3 object's Tag name and Tag value respectively. For Ex: If the " +
+                    "incoming FlowFile carries the attributes tagS3country, tagS3PII, the tag prefix to be specified would be 'tagS3'")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor REMOVE_TAG_PREFIX = new PropertyDescriptor.Builder()
+            .name("s3-object-remove-tags-prefix")
+            .displayName("Remove Tag Prefix")
+            .description("If set to 'True', the value provided for '" + OBJECT_TAGS_PREFIX.getDisplayName() + "' will be removed from " +
+                    "the attribute(s) and then considered as the Tag name. For ex: If the incoming FlowFile carries the attributes tagS3country, " +
+                    "tagS3PII and the prefix is set to 'tagS3' then the corresponding tag values would be 'country' and 'PII'")
+            .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
+            .defaultValue("false")
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-        Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
-            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
-            ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE,
-            SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+        Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
+            STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
+            CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
+            MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
 
     final static String S3_BUCKET_KEY = "s3.bucket";
     final static String S3_OBJECT_KEY = "s3.key";
@@ -415,6 +439,7 @@ public class PutS3Object extends AbstractS3Processor {
          * Then
          */
         try {
+            final FlowFile flowFileCopy = flowFile;
             session.read(flowFile, new InputStreamCallback() {
                 @Override
                 public void process(final InputStream rawIn) throws IOException {
@@ -471,6 +496,10 @@ public class PutS3Object extends AbstractS3Processor {
                                 request.withCannedAcl(cannedAcl);
                             }
 
+                            if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
+                                request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
+                            }
+
                             try {
                                 final PutObjectResult result = s3.putObject(request);
                                 if (result.getVersionId() != null) {
@@ -562,6 +591,11 @@ public class PutS3Object extends AbstractS3Processor {
                                 if (cannedAcl != null) {
                                     initiateRequest.withCannedACL(cannedAcl);
                                 }
+
+                                if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
+                                    initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
+                                }
+
                                 try {
                                     final InitiateMultipartUploadResult initiateResult =
                                             s3.initiateMultipartUpload(initiateRequest);
@@ -785,6 +819,26 @@ public class PutS3Object extends AbstractS3Processor {
         }
     }
 
+    private List<Tag> getObjectTags(ProcessContext context, FlowFile flowFile) {
+        final String prefix = context.getProperty(OBJECT_TAGS_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
+        final List<Tag> objectTags = new ArrayList<>();
+        final Map<String, String> attributesMap = flowFile.getAttributes();
+
+        attributesMap.entrySet().stream().sequential()
+                .filter(attribute -> attribute.getKey().startsWith(prefix))
+                .forEach(attribute -> {
+                    String tagKey = attribute.getKey();
+                    String tagValue = attribute.getValue();
+
+                    if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) {
+                        tagKey = tagKey.replace(prefix, "");
+                    }
+                    objectTags.add(new Tag(tagKey, tagValue));
+                });
+
+        return objectTags;
+    }
+
     protected static class MultipartState implements Serializable {
 
         private static final long serialVersionUID = 9006072180563519740L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index eaaaa8a..d59093b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -23,8 +23,11 @@ import com.amazonaws.services.s3.model.CreateBucketRequest;
 import com.amazonaws.services.s3.model.DeleteBucketRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.ObjectTagging;
 import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -38,6 +41,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
 
 import static org.junit.Assert.fail;
 
@@ -58,7 +62,7 @@ public abstract class AbstractS3IT {
     // when bucket is rapidly added/deleted and consistency propagation causes this error.
     // (Should not be necessary if REGION remains static, but added to prevent future frustration.)
     // [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
-    protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION;
+    protected final static String BUCKET_NAME = "test-bucket-" + System.currentTimeMillis() + "-" + REGION;
 
     // Static so multiple Tests can use same client
     protected static AmazonS3Client client;
@@ -144,6 +148,12 @@ public abstract class AbstractS3IT {
         client.putObject(putRequest);
     }
 
+    protected void putFileWithObjectTag(String key, File file, List<Tag> objectTags) {
+        PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file);
+        putRequest.setTagging(new ObjectTagging(objectTags));
+        PutObjectResult result = client.putObject(putRequest);
+    }
+
     protected Path getResourcePath(String resourceName) {
         Path path = null;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
index e7a4482..fbe17ed 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.processors.aws.AbstractAWSProcessor;
 import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.util.MockFlowFile;
@@ -24,6 +25,7 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -64,7 +66,7 @@ public class ITListS3 extends AbstractS3IT {
 
         runner.addControllerService("awsCredentialsProvider", serviceImpl);
 
-        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
         runner.enableControllerService(serviceImpl);
         runner.assertValid(serviceImpl);
 
@@ -142,4 +144,33 @@ public class ITListS3 extends AbstractS3IT {
         flowFiles.get(0).assertAttributeEquals("filename", "b/c");
     }
 
+    @Test
+    public void testObjectTagsWritten() {
+        List<Tag> objectTags = new ArrayList<>();
+        objectTags.add(new Tag("dummytag1", "dummyvalue1"));
+        objectTags.add(new Tag("dummytag2", "dummyvalue2"));
+
+        putFileWithObjectTag("b/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags);
+
+        final TestRunner runner = TestRunners.newTestRunner(new ListS3());
+
+        runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(ListS3.PREFIX, "b/");
+        runner.setProperty(ListS3.REGION, REGION);
+        runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
+        runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+
+        MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
+
+        flowFiles.assertAttributeEquals("filename", "b/fileWithTag");
+        flowFiles.assertAttributeExists("s3.tag.dummytag1");
+        flowFiles.assertAttributeExists("s3.tag.dummytag2");
+        flowFiles.assertAttributeEquals("s3.tag.dummytag1", "dummyvalue1");
+        flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 2832e29..2db139a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -28,8 +28,11 @@ import java.util.Map;
 import java.util.regex.Pattern;
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
 import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -185,7 +188,7 @@ public class ITPutS3Object extends AbstractS3IT {
 
         runner.addControllerService("awsCredentialsProvider", serviceImpl);
 
-        runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
         runner.enableControllerService(serviceImpl);
 
         runner.assertValid(serviceImpl);
@@ -840,6 +843,35 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals(0, uploadList.getMultipartUploads().size());
     }
 
+    @Test
+    public void testObjectTags() throws IOException, InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
+        runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "tag-test.txt");
+        attrs.put("tagS3PII", "true");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+
+        GetObjectTaggingResult result = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, "tag-test.txt"));
+        List<Tag> objectTags = result.getTagSet();
+
+        for (Tag tag : objectTags) {
+            System.out.println("Tag Key : " + tag.getKey() + ", Tag Value : " + tag.getValue());
+        }
+
+        Assert.assertTrue(objectTags.size() == 1);
+        Assert.assertEquals("PII", objectTags.get(0).getKey());
+        Assert.assertEquals("true", objectTags.get(0).getValue());
+    }
+
     private class MockAmazonS3Client extends AmazonS3Client {
         MultipartUploadListing listing;
         public void setListing(MultipartUploadListing newlisting) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 48210b9..be12d6d 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
@@ -292,16 +293,44 @@ public class TestListS3 {
     }
 
     @Test
+    public void testWriteObjectTags() {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
+
+        Date lastModified = new Date();
+        ObjectListing objectListing = new ObjectListing();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("a");
+        objectSummary1.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary1);
+
+        Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+        runner.run();
+
+        ArgumentCaptor<GetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(GetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).getObjectTagging(captureRequest.capture());
+        GetObjectTaggingRequest request = captureRequest.getValue();
+
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("a", request.getKey());
+        Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
+    }
+
+    @Test
     public void testGetPropertyDescriptors() throws Exception {
         ListS3 processor = new ListS3();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 20, pd.size());
+        assertEquals("size should be eq", 21, pd.size());
         assertTrue(pd.contains(ListS3.ACCESS_KEY));
         assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(ListS3.BUCKET));
         assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
         assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
         assertTrue(pd.contains(ListS3.REGION));
+        assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
         assertTrue(pd.contains(ListS3.SECRET_KEY));
         assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
         assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));

http://git-wip-us.apache.org/repos/asf/nifi/blob/37523989/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 747da00..2c3db81 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.amazonaws.services.s3.model.Tag;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -71,26 +72,8 @@ public class TestPutS3Object {
 
     @Test
     public void testPutSinglePart() {
-        runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
-        runner.setProperty(PutS3Object.BUCKET, "test-bucket");
         runner.setProperty("x-custom-prop", "hello");
-        final Map<String, String> ffAttributes = new HashMap<>();
-        ffAttributes.put("filename", "testfile.txt");
-        runner.enqueue("Test Content", ffAttributes);
-
-        PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class);
-        Date expiration = new Date();
-        putObjectResult.setExpirationTime(expiration);
-        putObjectResult.setMetadata(new ObjectMetadata());
-        putObjectResult.setVersionId("test-version");
-        Mockito.when(putObjectResult.getETag()).thenReturn("test-etag");
-        Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
-        MultipartUploadListing uploadListing = new MultipartUploadListing();
-        Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
-        Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url");
-
-        runner.assertValid();
-        runner.run(1);
+        testBase();
 
         ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
         Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
@@ -98,8 +81,10 @@ public class TestPutS3Object {
         assertEquals("test-bucket", request.getBucketName());
 
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+
         List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
         MockFlowFile ff0 = flowFiles.get(0);
+
         ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
         ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
         ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
@@ -148,10 +133,54 @@ public class TestPutS3Object {
     }
 
     @Test
+    public void testObjectTags() {
+        runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
+        runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "false");
+        testBase();
+
+        ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
+        PutObjectRequest request = captureRequest.getValue();
+
+        List<Tag> tagSet = request.getTagging().getTagSet();
+
+        assertEquals(1, tagSet.size());
+        assertEquals("tagS3PII", tagSet.get(0).getKey());
+        assertEquals("true", tagSet.get(0).getValue());
+
+    }
+
+    private void testBase() {
+        runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
+        runner.setProperty(PutS3Object.BUCKET, "test-bucket");
+
+        final Map<String, String> ffAttributes = new HashMap<>();
+        ffAttributes.put("filename", "testfile.txt");
+        ffAttributes.put("tagS3PII", "true");
+        runner.enqueue("Test Content", ffAttributes);
+
+        PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class);
+        Date expiration = new Date();
+
+        putObjectResult.setExpirationTime(expiration);
+        putObjectResult.setMetadata(new ObjectMetadata());
+        putObjectResult.setVersionId("test-version");
+
+        Mockito.when(putObjectResult.getETag()).thenReturn("test-etag");
+        Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
+
+        MultipartUploadListing uploadListing = new MultipartUploadListing();
+        Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
+
+        runner.assertValid();
+        runner.run(1);
+    }
+
+    @Test
     public void testGetPropertyDescriptors() throws Exception {
         PutS3Object processor = new PutS3Object();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 31, pd.size());
+        assertEquals("size should be eq", 33, pd.size());
         assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
         assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -178,6 +207,8 @@ public class TestPutS3Object {
         assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
         assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
         assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD));
+        assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
+        assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
     }
 
 }