You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2019/08/13 07:15:48 UTC

[nifi] branch master updated: NIFI-6468: Adding AWS S3 'requester pays' to Fetch and List processors.

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c46f0  NIFI-6468: Adding AWS S3 'requester pays' to Fetch and List processors.
32c46f0 is described below

commit 32c46f0bdd49f039cabebe7c6652693e6948081f
Author: Joe Gresock <jo...@lmco.com>
AuthorDate: Tue Jul 30 15:11:59 2019 +0000

    NIFI-6468: Adding AWS S3 'requester pays' to Fetch and List processors.
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #3601.
---
 .../nifi/processors/aws/s3/FetchS3Object.java      |  18 +++-
 .../org/apache/nifi/processors/aws/s3/ListS3.java  |  53 +++++++++-
 .../nifi/processors/aws/s3/TestFetchS3Object.java  |  63 +++++++++++-
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 107 +++++++++++++++++++++
 4 files changed, 238 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 122231a..b66468a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -73,10 +74,23 @@ public class FetchS3Object extends AbstractS3Processor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
             .build();
+    public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
+            .name("requester-pays")
+            .displayName("Requester Pays")
+            .required(true)
+            .description("If true, indicates that the requester consents to pay any charges associated with retrieving objects from "
+                    + "the S3 bucket.  This sets the 'x-amz-request-payer' header to 'requester'.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
+                    + "with retrieving objects from the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
+                            + "requester charges for retrieving objects from the S3 bucket."))
+            .defaultValue("false")
+            .build();
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
             Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
-                SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+                SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
+                REQUESTER_PAYS));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -94,6 +108,7 @@ public class FetchS3Object extends AbstractS3Processor {
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+        final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
 
         final AmazonS3 client = getClient();
         final GetObjectRequest request;
@@ -102,6 +117,7 @@ public class FetchS3Object extends AbstractS3Processor {
         } else {
             request = new GetObjectRequest(bucket, key, versionId);
         }
+        request.setRequesterPays(requesterPays);
 
         final Map<String, String> attributes = new HashMap<>();
         try (final S3Object s3Object = client.getObject(request)) {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index fb4e49f..34c1dec 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -52,6 +52,9 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -153,6 +156,19 @@ public class ListS3 extends AbstractS3Processor {
             .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
             .defaultValue("false")
             .build();
+    public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
+            .name("requester-pays")
+            .displayName("Requester Pays")
+            .required(true)
+            .description("If true, indicates that the requester consents to pay any charges associated with listing "
+                    + "the S3 bucket.  This sets the 'x-amz-request-payer' header to 'requester'.  Note that this "
+                    + "setting is not applicable when 'Use Versions' is 'true'.")
+            .addValidator(createRequesterPaysValidator())
+            .allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
+                    + "with listing the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
+                            + "requester charges for listing the S3 bucket."))
+            .defaultValue("false")
+            .build();
 
     public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
             .name("write-s3-user-metadata")
@@ -168,7 +184,7 @@ public class ListS3 extends AbstractS3Processor {
             Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
                     AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
                     SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
-                    PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
+                    PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE, REQUESTER_PAYS));
 
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@@ -180,6 +196,23 @@ public class ListS3 extends AbstractS3Processor {
     private long currentTimestamp = 0L;
     private Set<String> currentKeys;
 
+    private static Validator createRequesterPaysValidator() {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                boolean requesterPays = Boolean.valueOf(input);
+                boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+                boolean valid = !requesterPays || !useVersions;
+                return new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(valid)
+                        .explanation(valid ? null : "'Requester Pays' cannot be used when listing object versions.")
+                        .build();
+            }
+        };
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
@@ -240,6 +273,7 @@ public class ListS3 extends AbstractS3Processor {
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
         final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long listingTimestamp = System.currentTimeMillis();
+        final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
 
         final AmazonS3 client = getClient();
         int listCount = 0;
@@ -257,6 +291,7 @@ public class ListS3 extends AbstractS3Processor {
                     : new S3ObjectBucketLister(client);
 
         bucketLister.setBucketName(bucket);
+        bucketLister.setRequesterPays(requesterPays);
 
         if (delimiter != null && !delimiter.isEmpty()) {
             bucketLister.setDelimiter(delimiter);
@@ -386,6 +421,7 @@ public class ListS3 extends AbstractS3Processor {
         public void setBucketName(String bucketName);
         public void setPrefix(String prefix);
         public void setDelimiter(String delimiter);
+        public void setRequesterPays(boolean requesterPays);
         // Versions have a superset of the fields that Objects have, so we'll use
         // them as a common interface
         public VersionListing listVersions();
@@ -418,6 +454,11 @@ public class ListS3 extends AbstractS3Processor {
         }
 
         @Override
+        public void setRequesterPays(boolean requesterPays) {
+            listObjectsRequest.setRequesterPays(requesterPays);
+        }
+
+        @Override
         public VersionListing listVersions() {
             VersionListing versionListing = new VersionListing();
             this.objectListing = client.listObjects(listObjectsRequest);
@@ -474,6 +515,11 @@ public class ListS3 extends AbstractS3Processor {
         }
 
         @Override
+        public void setRequesterPays(boolean requesterPays) {
+            listObjectsRequest.setRequesterPays(requesterPays);
+        }
+
+        @Override
         public VersionListing listVersions() {
             VersionListing versionListing = new VersionListing();
             this.objectListing = client.listObjectsV2(listObjectsRequest);
@@ -530,6 +576,11 @@ public class ListS3 extends AbstractS3Processor {
         }
 
         @Override
+        public void setRequesterPays(boolean requesterPays) {
+            // Not supported in versionListing, so this does nothing.
+        }
+
+        @Override
         public VersionListing listVersions() {
             versionListing = client.listVersions(listVersionsRequest);
             return versionListing;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index bcfff23..4326ed7 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -43,6 +43,7 @@ import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
@@ -100,6 +101,65 @@ public class TestFetchS3Object {
         GetObjectRequest request = captureRequest.getValue();
         assertEquals("request-bucket", request.getBucketName());
         assertEquals("request-key", request.getKey());
+        assertFalse(request.isRequesterPays());
+        assertNull(request.getVersionId());
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        MockFlowFile ff = ffs.get(0);
+        ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
+        ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
+        ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
+        ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
+        ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
+        ff.assertAttributeEquals("hash.value", "testMD5hash");
+        ff.assertAttributeEquals("hash.algorithm", "MD5");
+        ff.assertAttributeEquals("s3.etag", "test-etag");
+        ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
+        ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
+        ff.assertAttributeEquals("userKey1", "userValue1");
+        ff.assertAttributeEquals("userKey2", "userValue2");
+        ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
+        ff.assertContentEquals("Some Content");
+    }
+
+    @Test
+    public void testGetObjectWithRequesterPays() throws IOException {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
+        runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        S3Object s3ObjectResponse = new S3Object();
+        s3ObjectResponse.setBucketName("response-bucket-name");
+        s3ObjectResponse.setKey("response-key");
+        s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
+        ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
+        metadata.setContentDisposition("key/path/to/file.txt");
+        metadata.setContentType("text/plain");
+        metadata.setContentMD5("testMD5hash");
+        Date expiration = new Date();
+        metadata.setExpirationTime(expiration);
+        metadata.setExpirationTimeRuleId("testExpirationRuleId");
+        Map<String, String> userMetadata = new HashMap<>();
+        userMetadata.put("userKey1", "userValue1");
+        userMetadata.put("userKey2", "userValue2");
+        metadata.setUserMetadata(userMetadata);
+        metadata.setSSEAlgorithm("testAlgorithm");
+        Mockito.when(metadata.getETag()).thenReturn("test-etag");
+        s3ObjectResponse.setObjectMetadata(metadata);
+        Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
+        runner.run(1);
+
+        ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
+        GetObjectRequest request = captureRequest.getValue();
+        assertEquals("request-bucket", request.getBucketName());
+        assertEquals("request-key", request.getKey());
+        assertTrue(request.isRequesterPays());
         assertNull(request.getVersionId());
 
         runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
@@ -179,7 +239,7 @@ public class TestFetchS3Object {
     public void testGetPropertyDescriptors() throws Exception {
         FetchS3Object processor = new FetchS3Object();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 17, pd.size());
+        assertEquals("size should be eq", 18, pd.size());
         assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
         assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(FetchS3Object.BUCKET));
@@ -197,6 +257,7 @@ public class TestFetchS3Object {
         assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
         assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
         assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
+        assertTrue(pd.contains(FetchS3Object.REQUESTER_PAYS));
 
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 3798d84..86af835 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
@@ -103,6 +104,7 @@ public class TestListS3 {
         Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
+        assertFalse(request.isRequesterPays());
         Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
@@ -118,6 +120,62 @@ public class TestListS3 {
     }
 
     @Test
+    public void testListWithRequesterPays() {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.REQUESTER_PAYS, "true");
+
+        Date lastModified = new Date();
+        ObjectListing objectListing = new ObjectListing();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("a");
+        objectSummary1.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary1);
+        S3ObjectSummary objectSummary2 = new S3ObjectSummary();
+        objectSummary2.setBucketName("test-bucket");
+        objectSummary2.setKey("b/c");
+        objectSummary2.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary2);
+        S3ObjectSummary objectSummary3 = new S3ObjectSummary();
+        objectSummary3.setBucketName("test-bucket");
+        objectSummary3.setKey("d/e");
+        objectSummary3.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary3);
+        Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+        runner.run();
+
+        ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
+        ListObjectsRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertTrue(request.isRequesterPays());
+        Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("filename", "a");
+        ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+        String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
+        ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
+        flowFiles.get(1).assertAttributeEquals("filename", "b/c");
+        flowFiles.get(2).assertAttributeEquals("filename", "d/e");
+        runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testListWithRequesterPays_invalid() {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions
+        runner.setProperty(ListS3.REQUESTER_PAYS, "true");
+
+        runner.assertNotValid();
+    }
+
+    @Test
     public void testListVersion2() {
         runner.setProperty(ListS3.REGION, "eu-west-1");
         runner.setProperty(ListS3.BUCKET, "test-bucket");
@@ -148,6 +206,54 @@ public class TestListS3 {
         Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
         ListObjectsV2Request request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
+        assertFalse(request.isRequesterPays());
+        Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("filename", "a");
+        ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+        String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
+        ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
+        flowFiles.get(1).assertAttributeEquals("filename", "b/c");
+        flowFiles.get(2).assertAttributeEquals("filename", "d/e");
+        runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testListVersion2WithRequesterPays() {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.REQUESTER_PAYS, "true");
+        runner.setProperty(ListS3.LIST_TYPE, "2");
+
+        Date lastModified = new Date();
+        ListObjectsV2Result objectListing = new ListObjectsV2Result();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("a");
+        objectSummary1.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary1);
+        S3ObjectSummary objectSummary2 = new S3ObjectSummary();
+        objectSummary2.setBucketName("test-bucket");
+        objectSummary2.setKey("b/c");
+        objectSummary2.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary2);
+        S3ObjectSummary objectSummary3 = new S3ObjectSummary();
+        objectSummary3.setBucketName("test-bucket");
+        objectSummary3.setKey("d/e");
+        objectSummary3.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary3);
+        Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+
+        runner.run();
+
+        ArgumentCaptor<ListObjectsV2Request> captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
+        ListObjectsV2Request request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertTrue(request.isRequesterPays());
         Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
@@ -375,5 +481,6 @@ public class TestListS3 {
         assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
         assertTrue(pd.contains(ListS3.PROXY_USERNAME));
         assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
+        assertTrue(pd.contains(ListS3.REQUESTER_PAYS));
     }
 }