You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/03/16 09:20:32 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1400 Implement and initially test base features of ListS3 processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ee1ed50  MINIFICPP-1400 Implement and initially test base features of ListS3 processor
ee1ed50 is described below

commit ee1ed503c82769711ab5ea03c61d65abff1dd46f
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 16 10:19:23 2021 +0100

    MINIFICPP-1400 Implement and initially test base features of ListS3 processor
    
    Closes #975
    
    Signed-off-by: Marton Szasz <sz...@gmail.com>
---
 .github/workflows/ci.yml                           |   4 +-
 PROCESSORS.md                                      |  38 +++
 README.md                                          |   2 +-
 docker/DockerVerify.sh                             |   6 +-
 .../integration/MiNiFi_integration_test_driver.py  |  15 +-
 docker/test/integration/features/s3.feature        |  49 +++-
 .../integration/minifi/core/DockerTestCluster.py   |   1 -
 .../minifi/core/DockerTestDirectoryBindings.py     |   2 +-
 .../integration/minifi/core/FileSystemObserver.py  |  11 +-
 .../minifi/core/SingleNodeDockerCluster.py         |   2 +-
 .../test/integration/minifi/processors/ListS3.py   |  22 ++
 .../integration/minifi/processors/PutS3Object.py   |   4 +-
 .../test/integration/minifi/processors/TailFile.py |   8 +
 .../minifi/validators/MultiFileOutputValidator.py  |  58 ++++
 docker/test/integration/steps/steps.py             |  27 +-
 extensions/aws/AWSLoader.h                         |   4 +
 extensions/aws/processors/DeleteS3Object.cpp       |  22 +-
 extensions/aws/processors/DeleteS3Object.h         |   9 +-
 extensions/aws/processors/FetchS3Object.cpp        |  18 +-
 extensions/aws/processors/FetchS3Object.h          |  13 +-
 extensions/aws/processors/ListS3.cpp               | 295 +++++++++++++++++++
 extensions/aws/processors/ListS3.h                 | 112 ++++++++
 extensions/aws/processors/PutS3Object.cpp          |  21 +-
 extensions/aws/processors/PutS3Object.h            |  15 +-
 extensions/aws/processors/S3Processor.cpp          |  43 +--
 extensions/aws/processors/S3Processor.h            |   9 +-
 extensions/aws/s3/S3ClientRequestSender.cpp        | 130 +++++++++
 .../s3/{S3Wrapper.h => S3ClientRequestSender.h}    |  13 +-
 extensions/aws/s3/S3RequestSender.cpp              |  63 +++++
 extensions/aws/s3/S3RequestSender.h                |  89 ++++++
 extensions/aws/s3/S3Wrapper.cpp                    | 314 +++++++++++++++++++--
 extensions/aws/s3/S3Wrapper.h                      | 181 +++++++++++-
 extensions/aws/s3/S3WrapperBase.cpp                | 196 -------------
 extensions/aws/s3/S3WrapperBase.h                  | 169 -----------
 libminifi/test/aws-tests/DeleteS3ObjectTests.cpp   |  30 +-
 libminifi/test/aws-tests/FetchS3ObjectTests.cpp    |  28 +-
 libminifi/test/aws-tests/ListS3Tests.cpp           | 231 +++++++++++++++
 libminifi/test/aws-tests/MockS3RequestSender.h     | 255 +++++++++++++++++
 libminifi/test/aws-tests/MockS3Wrapper.h           | 101 -------
 libminifi/test/aws-tests/PutS3ObjectTests.cpp      |  62 ++--
 libminifi/test/aws-tests/S3TestsFixture.h          | 166 +++++++----
 41 files changed, 2122 insertions(+), 716 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 77b105c..4fa7a79 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -30,7 +30,7 @@ jobs:
         run: |
           export PATH="/usr/local/opt/lua@5.3/lib:/usr/local/opt/lua@5.3/include:/usr/local/opt/lua@5.3/bin:$PATH"
           export PKG_CONFIG_PATH="/usr/local/opt/lua@5.3/lib/pkgconfig"
-          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
+          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DENABLE_AWS=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
   macos_xcode_12_0:
     name: "macos-xcode12.0"
     runs-on: macos-10.15
@@ -60,7 +60,7 @@ jobs:
         run: |
           export PATH="/usr/local/opt/lua@5.3/lib:/usr/local/opt/lua@5.3/include:/usr/local/opt/lua@5.3/bin:$PATH"
           export PKG_CONFIG_PATH="/usr/local/opt/lua@5.3/lib/pkgconfig"
-          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
+          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DENABLE_AWS=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
   windows_VS2017:
     name: "windows-vs2017"
     runs-on: windows-2016
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0ca385d..7034be2 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -31,6 +31,7 @@
 - [ListSFTP](#listsftp)
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
+- [ListS3](#lists3)
 - [LogAttribute](#logattribute)
 - [ManipulateArchive](#manipulatearchive)
 - [MergeContent](#mergecontent)
@@ -801,6 +802,43 @@ In the list below, the names of required properties appear in bold. Any other pr
 |success|All files are routed to success|
 
 
+## ListS3
+
+### Description
+
+Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchS3Object.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Bucket**|||The S3 bucket<br/>**Supports Expression Language: true**|
+|Access Key|||AWS account access key<br/>**Supports Expression Language: true**|
+|Secret Key|||AWS account secret key<br/>**Supports Expression Language: true**|
+|Credentials File|||Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey|
+|AWS Credentials Provider service|||The name of the AWS Credentials Provider controller service that is used to obtain AWS credentials.|
+|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS Region|
+|**Communications Timeout**|30 sec||Sets the timeout of the communication between the AWS server and the client|
+|Endpoint Override URL|||Endpoint URL to use instead of the AWS default including scheme, host, port, and path. The AWS libraries select an endpoint URL based on the AWS region, but this property overrides the selected endpoint URL, allowing use with other S3-compatible endpoints.<br/>**Supports Expression Language: true**|
+|Proxy Host|||Proxy host name or IP<br/>**Supports Expression Language: true**|
+|Proxy Port|||The port number of the proxy host<br/>**Supports Expression Language: true**|
+|Proxy Username|||Username to set when authenticating against proxy<br/>**Supports Expression Language: true**|
+|Proxy Password|||Password to set when authenticating against proxy<br/>**Supports Expression Language: true**|
+|Delimiter|||The string used to delimit directories within the bucket. Please consult the AWS documentation for the correct use of this field.|
+|Prefix|||The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').|
+|**Use Versions**|false||Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.|
+|**Minimum Object Age**|0 sec||The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored.|
+|**Write Object Tags**|false||If set to 'true', the tags associated with the S3 object will be written as FlowFile attributes.|
+|**Write User Metadata**|false||If set to 'true', the user defined metadata associated with the S3 object will be added to FlowFile attributes/records.|
+|**Requester Pays**|false||If true, indicates that the requester consents to pay any charges associated with listing the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. Note that this setting is only used if Write User Metadata is true.|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|FlowFiles are routed to success relationship|
+
 ## LogAttribute
 
 ### Description
diff --git a/README.md b/README.md
index 4a4d4d7..31afe09 100644
--- a/README.md
+++ b/README.md
@@ -75,7 +75,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 | Extension Set        | Processors           | CMAKE Flag  |
 | ------------- |:-------------| :-----|
 | Archive Extensions    | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)      |   -DBUILD_LIBARCHIVE=ON |
-| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object) | -DENABLE_AWS=ON  |
+| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON  |
 | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp)  | -DDISABLE_CIVET=ON |
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index afb4077..dc59e64 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -75,7 +75,7 @@ export PYTHONPATH
 BEHAVE_OPTS="-f pretty --logging-level INFO --logging-clear-handlers"
 
 cd "${docker_dir}/test/integration"
-exec 
+exec
   behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "Get and put operations run in a simple flow" &&
   behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "PutFile does not overwrite a file that already exists" &&
   behave $BEHAVE_OPTS "features/s2s.feature" -n "A MiNiFi instance produces and transfers data to a NiFi instance via s2s" &&
@@ -92,4 +92,6 @@ exec
   behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can remove s3 bucket objects" &&
   behave $BEHAVE_OPTS "features/s3.feature" -n "Deletion of a s3 object through a proxy-server succeeds" &&
   behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects directly" &&
-  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects via a http-proxy"
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects via a http-proxy" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can list an S3 bucket directly" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can list an S3 bucket objects via a http-proxy"
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index ca5f04a..d8207c1 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -19,6 +19,7 @@ from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
 from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
 from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
 from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
+from minifi.validators.MultiFileOutputValidator import MultiFileOutputValidator
 
 class MiNiFi_integration_test():
     def __init__(self, context):
@@ -55,7 +56,7 @@ class MiNiFi_integration_test():
 
         # The cluster deleter is not reliable for cleaning up
         docker_client = docker.from_env()
-        for container_id in container_ids:    
+        for container_id in container_ids:
             self.delete_docker_container_by_id(container_id)
 
         del self.docker_directory_bindings
@@ -119,7 +120,7 @@ class MiNiFi_integration_test():
         self.connectable_nodes.append(processor)
 
     def get_or_create_node_by_name(self, node_name):
-        node = self.get_node_by_name(node_name) 
+        node = self.get_node_by_name(node_name)
         if node is None:
             if node_name == "RemoteProcessGroup":
                 raise Exception("Trying to register RemoteProcessGroup without an input port or address.")
@@ -152,8 +153,7 @@ class MiNiFi_integration_test():
         input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), "input_port"))
         return input_port_node
 
-    def add_test_data(self, path, test_data):
-        file_name = str(uuid.uuid4())
+    def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
         self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
 
     def put_test_resource(self, file_name, contents):
@@ -178,6 +178,11 @@ class MiNiFi_integration_test():
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
         self.check_output(timeout_seconds, output_validator, 1, subdir)
 
+    def check_for_multiple_files_generated(self, file_count, timeout_seconds, subdir=''):
+        output_validator = MultiFileOutputValidator(file_count, subdir)
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, file_count, subdir)
+
     def check_for_multiple_empty_files_generated(self, timeout_seconds, subdir=''):
         output_validator = EmptyFilesOutPutValidator()
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
@@ -186,7 +191,7 @@ class MiNiFi_integration_test():
     def check_output(self, timeout_seconds, output_validator, max_files, subdir):
         if subdir:
             output_validator.subdir = subdir
-        self.file_system_observer.wait_for_output(timeout_seconds, max_files)
+        self.file_system_observer.wait_for_output(timeout_seconds, output_validator, max_files)
         for cluster in self.clusters.values():
             # Logs for both nifi and minifi, but not other engines
             cluster.log_nifi_output()
diff --git a/docker/test/integration/features/s3.feature b/docker/test/integration/features/s3.feature
index 03f0e11..529d052 100644
--- a/docker/test/integration/features/s3.feature
+++ b/docker/test/integration/features/s3.feature
@@ -37,7 +37,7 @@ Feature: Sending data from MiNiFi-C++ to an AWS server
     And the "success" relationship of the PutS3Object processor is connected to the PutFile
 
     And a s3 server "s3" is set up in correspondence with the PutS3Object
-    And the http proxy server "http-proxy" is set up 
+    And the http proxy server "http-proxy" is set up
     When all instances start up
 
     Then a flowfile with the content "test" is placed in the monitored directory in less than 150 seconds
@@ -98,7 +98,7 @@ Feature: Sending data from MiNiFi-C++ to an AWS server
       | DeleteS3Object | success           | PutFile          |
 
     And a s3 server "s3" is set up in correspondence with the PutS3Object
-    And the http proxy server "http-proxy" is set up 
+    And the http proxy server "http-proxy" is set up
 
     When all instances start up
 
@@ -147,9 +147,52 @@ Feature: Sending data from MiNiFi-C++ to an AWS server
       | FetchS3Object    | success           | PutFile          |
 
     And a s3 server "s3" is set up in correspondence with the PutS3Object
-    And a http proxy server "http-proxy" is set up accordingly 
+    And a http proxy server "http-proxy" is set up accordingly
 
     When all instances start up
 
     Then a flowfile with the content "test" is placed in the monitored directory in less than 150 seconds
     And no errors were generated on the "http-proxy" regarding "http://s3-server:9090/test_bucket/test_object_key"
+
+  Scenario: A MiNiFi instance can list an S3 bucket directly
+    Given a TailFile processor with the "File to Tail" property set to "/tmp/input/test_file.log"
+    And the "Input Delimiter" of the TailFile processor is set to "%"
+    And a file with filename "test_file.log" and content "test_data%" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "Object Key" of the PutS3Object processor is set to "${filename}"
+    And the "success" relationship of the TailFile processor is connected to the PutS3Object
+
+    Given a ListS3 processor in the "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListS3 processor is connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When all instances start up
+    And content "test_data2%" is added to file "test_file.log" present in directory "/tmp/input" 5 seconds later
+
+    Then 2 flowfiles are placed in the monitored directory in 60 seconds
+
+  Scenario: A MiNiFi instance can list an S3 bucket objects via a http-proxy
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "success" relationship of the GetFile processor is connected to the PutS3Object
+
+    Given a ListS3 processor in the "secondary" flow
+    And these processor properties are set to match the http proxy:
+      | processor name | property name  | property value |
+      | ListS3         | Proxy Host     | http-proxy     |
+      | ListS3         | Proxy Port     | 3128           |
+      | ListS3         | Proxy Username | admin          |
+      | ListS3         | Proxy Password | test101        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListS3 processor is connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And a http proxy server "http-proxy" is set up accordingly
+
+    When all instances start up
+
+    Then 1 flowfile is placed in the monitored directory in 120 seconds
+    And no errors were generated on the "http-proxy" regarding "http://s3-server:9090/test_bucket/test_object_key"
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 2b0f13e..3fa5035 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -9,7 +9,6 @@ import uuid
 
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
-from .FileSystemObserver import FileSystemObserver
 
 class DockerTestCluster(SingleNodeDockerCluster):
     def __init__(self):
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 28f9f77..b0d0397 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -74,7 +74,7 @@ class DockerTestDirectoryBindings:
     @staticmethod
     def put_file_contents(file_abs_path, contents):
         logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
-        with open(file_abs_path, 'wb') as test_input_file:
+        with open(file_abs_path, 'ab') as test_input_file:
             test_input_file.write(contents)
 
     def put_test_resource(self, test_id, file_name, contents):
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/minifi/core/FileSystemObserver.py
index b4c0f7f..70ead37 100644
--- a/docker/test/integration/minifi/core/FileSystemObserver.py
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -17,7 +17,7 @@ class FileSystemObserver(object):
         self.done_event = Event()
         self.event_handler = OutputEventHandler(self.done_event)
         self.observer = Observer()
-        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.schedule(self.event_handler, self.test_output_dir, recursive=True)
         self.observer.start()
 
     def get_output_dir(self):
@@ -29,18 +29,19 @@ class FileSystemObserver(object):
 
         self.observer = Observer()
         self.done_event.clear()
-        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.schedule(self.event_handler, self.test_output_dir, recursive=True)
         self.observer.start()
 
-    def wait_for_output(self, timeout_seconds, max_files):
-        logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+    def wait_for_output(self, timeout_seconds, output_validator, max_files):
+        logging.info('Waiting up to %d seconds for %d test outputs...', timeout_seconds, max_files)
         self.restart_observer_if_needed()
         wait_start_time = time.perf_counter()
         for i in range(0, max_files):
             # Note: The timing on Event.wait() is inaccurate
             self.done_event.wait(timeout_seconds)
+            self.done_event.clear()
             current_time = time.perf_counter()
-            if timeout_seconds < (current_time - wait_start_time):
+            if timeout_seconds < (current_time - wait_start_time) or output_validator.validate():
                 break
         self.observer.stop()
         self.observer.join()
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index b4dede7..79ff8ec 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -158,7 +158,7 @@ class SingleNodeDockerCluster(Cluster):
                 network=self.network.name,
                 volumes=self.vols)
         self.network.reload()
-        
+
         logging.info('Started container \'%s\'', container.name)
 
         self.containers[container.name] = container
diff --git a/docker/test/integration/minifi/processors/ListS3.py b/docker/test/integration/minifi/processors/ListS3.py
new file mode 100644
index 0000000..7c14d75
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListS3.py
@@ -0,0 +1,22 @@
+from ..core.Processor import Processor
+
+
+class ListS3(Processor):
+    def __init__(self,
+                 proxy_host='',
+                 proxy_port='',
+                 proxy_username='',
+                 proxy_password=''):
+        super(ListS3, self).__init__('ListS3',
+                                     properties={
+                                         'Bucket': 'test_bucket',
+                                         'Access Key': 'test_access_key',
+                                         'Secret Key': 'test_secret',
+                                         'Endpoint Override URL': "http://s3-server:9090",
+                                         'Proxy Host': proxy_host,
+                                         'Proxy Port': proxy_port,
+                                         'Proxy Username': proxy_username,
+                                         'Proxy Password': proxy_password,
+                                     },
+                                     schedule={'scheduling period': '3 sec'},
+                                     auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py b/docker/test/integration/minifi/processors/PutS3Object.py
index ab8b1be..bd5fc84 100644
--- a/docker/test/integration/minifi/processors/PutS3Object.py
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -1,14 +1,16 @@
 from ..core.Processor import Processor
 
+
 class PutS3Object(Processor):
     def __init__(self,
+        object_key='test_object_key',
         proxy_host='',
         proxy_port='',
         proxy_username='',
         proxy_password=''):
             super(PutS3Object, self).__init__('PutS3Object',
             properties = {
-                'Object Key': 'test_object_key',
+                'Object Key': object_key,
                 'Bucket': 'test_bucket',
                 'Access Key': 'test_access_key',
                 'Secret Key': 'test_secret',
diff --git a/docker/test/integration/minifi/processors/TailFile.py b/docker/test/integration/minifi/processors/TailFile.py
new file mode 100644
index 0000000..ab30ad7
--- /dev/null
+++ b/docker/test/integration/minifi/processors/TailFile.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+
+class TailFile(Processor):
+    def __init__(self, filename="/tmp/input/test_file.log"):
+        super(TailFile, self).__init__('TailFile',
+                                       properties={'File to Tail': filename},
+                                       auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/validators/MultiFileOutputValidator.py b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
new file mode 100644
index 0000000..e97829e
--- /dev/null
+++ b/docker/test/integration/minifi/validators/MultiFileOutputValidator.py
@@ -0,0 +1,58 @@
+import logging
+import os
+import subprocess
+
+from os import listdir
+from os.path import join
+
+from .FileOutputValidator import FileOutputValidator
+
+
+class MultiFileOutputValidator(FileOutputValidator):
+    """
+    Validates the content of multiple files in the given directory, also verifying that the old files are not rewritten.
+    """
+
+    def __init__(self, expected_file_count, subdir=''):
+        self.valid = False
+        self.expected_file_count = expected_file_count
+        self.subdir = subdir
+        self.file_timestamps = dict()
+
+    def validate(self):
+        self.valid = False
+        full_dir = os.path.join(self.output_dir, self.subdir)
+        logging.info("Output folder: %s", full_dir)
+
+        if not os.path.isdir(full_dir):
+            return self.valid
+
+        listing = listdir(full_dir)
+        if not listing:
+            return self.valid
+
+        for out_file_name in listing:
+            logging.info("name:: %s", out_file_name)
+
+            full_path = join(full_dir, out_file_name)
+            if not os.path.isfile(full_path):
+                return self.valid
+
+            with open(full_path, 'r') as out_file:
+                contents = out_file.read()
+                logging.info("dir %s -- name %s", full_dir, out_file_name)
+                logging.info("expected file count %d -- current file count %d", self.expected_file_count, len(self.file_timestamps))
+
+                if full_path in self.file_timestamps and self.file_timestamps[full_path] != os.path.getmtime(full_path):
+                    logging.error("Last modified timestamp changed for %s", full_path)
+                    self.valid = False
+                    return self.valid
+
+                self.file_timestamps[full_path] = os.path.getmtime(full_path)
+                logging.info("New file added %s", full_path)
+
+                if len(self.file_timestamps) == self.expected_file_count:
+                    self.valid = True
+                    return self.valid
+
+        return self.valid
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index e376c2f..bfa4feb 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -11,6 +11,7 @@ from minifi.processors.PublishKafka import PublishKafka
 from minifi.processors.PutS3Object import PutS3Object
 from minifi.processors.DeleteS3Object import DeleteS3Object
 from minifi.processors.FetchS3Object import FetchS3Object
+from minifi.processors.ListS3 import ListS3
 
 
 from behave import given, then, when
@@ -40,7 +41,8 @@ def step_impl(context, processor_type, property, property_value, cluster_name):
     logging.info("Acquiring " + cluster_name)
     cluster = context.test.acquire_cluster(cluster_name)
     processor = locate("minifi.processors." + processor_type + "." + processor_type)()
-    processor.set_property(property, property_value)
+    if property:
+        processor.set_property(property, property_value)
     processor.set_name(processor_type)
     context.test.add_node(processor)
     # Assume that the first node declared is primary unless specified otherwise
@@ -48,12 +50,17 @@ def step_impl(context, processor_type, property, property_value, cluster_name):
         cluster.set_name(cluster_name)
         cluster.set_flow(processor)
 
-
 @given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\"")
 def step_impl(context, processor_type, property, property_value):
     context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow".
         format(processor_type=processor_type, property=property, property_value=property_value, cluster_name="primary_cluster"))
 
+@given("a {processor_type} processor in the \"{cluster_name}\" flow")
+@given("a {processor_type} processor in a \"{cluster_name}\" flow")
+def step_impl(context, processor_type, cluster_name):
+    context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow".
+        format(processor_type=processor_type, property=None, property_value=None, cluster_name=cluster_name))
+
 @given("a set of processors in the \"{cluster_name}\" flow")
 def step_impl(context, cluster_name):
     cluster = context.test.acquire_cluster(cluster_name)
@@ -154,6 +161,10 @@ def step_impl(context):
 def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
+@given("a file with filename \"{file_name}\" and content \"{content}\" is present in \"{path}\"")
+def step_impl(context, file_name, content, path):
+    context.test.add_test_data(path, content, file_name)
+
 # NiFi setups
 
 @given("a NiFi flow \"{cluster_name}\" receiving data from a RemoteProcessGroup \"{source_name}\" on port {port}")
@@ -188,7 +199,7 @@ def step_impl(context, cluster_name):
     cluster.set_flow(None)
 
 # TLS
-# 
+#
 @given("an ssl context service set up for {producer_name} and {consumer_name}")
 def step_impl(context, producer_name, consumer_name):
     cert, key = gen_cert()
@@ -227,10 +238,20 @@ def step_impl(context, cluster_name):
 def step_impl(context):
     context.test.start()
 
+@when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later")
+def step_impl(context, content, file_name, path, seconds):
+    time.sleep(seconds)
+    context.test.add_test_data(path, content, file_name)
+
 @then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
     context.test.check_for_file_with_content_generated(content, timeparse(duration))
 
+@then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
+@then("{number_of_files:d} flowfile is placed in the monitored directory in {duration}")
+def step_impl(context, number_of_files, duration):
+    context.test.check_for_multiple_files_generated(number_of_files, timeparse(duration))
+
 @then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
 def step_impl(context, duration):
     context.test.check_for_multiple_empty_files_generated(timeparse(duration))
diff --git a/extensions/aws/AWSLoader.h b/extensions/aws/AWSLoader.h
index 6f847b9..6419046 100644
--- a/extensions/aws/AWSLoader.h
+++ b/extensions/aws/AWSLoader.h
@@ -28,6 +28,7 @@
 #include "processors/PutS3Object.h"
 #include "processors/DeleteS3Object.h"
 #include "processors/FetchS3Object.h"
+#include "processors/ListS3.h"
 
 class AWSObjectFactory : public core::ObjectFactory {
  public:
@@ -55,6 +56,7 @@ class AWSObjectFactory : public core::ObjectFactory {
     class_names.push_back("PutS3Object");
     class_names.push_back("DeleteS3Object");
     class_names.push_back("FetchS3Object");
+    class_names.push_back("ListS3");
     return class_names;
   }
 
@@ -67,6 +69,8 @@ class AWSObjectFactory : public core::ObjectFactory {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::aws::processors::DeleteS3Object>());
     } else if (utils::StringUtils::equalsIgnoreCase(class_name, "FetchS3Object")) {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::aws::processors::FetchS3Object>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListS3")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::aws::processors::ListS3>());
     } else {
       return nullptr;
     }
diff --git a/extensions/aws/processors/DeleteS3Object.cpp b/extensions/aws/processors/DeleteS3Object.cpp
index 6a216fb..fbe9338 100644
--- a/extensions/aws/processors/DeleteS3Object.cpp
+++ b/extensions/aws/processors/DeleteS3Object.cpp
@@ -30,6 +30,11 @@ namespace minifi {
 namespace aws {
 namespace processors {
 
+const core::Property DeleteS3Object::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object. If none is given the filename attribute will be used by default.")
+    ->supportsExpressionLanguage(true)
+    ->build());
 const core::Property DeleteS3Object::Version(
   core::PropertyBuilder::createProperty("Version")
     ->withDescription("The Version of the Object to delete")
@@ -41,7 +46,7 @@ const core::Relationship DeleteS3Object::Failure("failure", "FlowFiles are route
 
 void DeleteS3Object::initialize() {
   // Add new supported properties
-  updateSupportedProperties({Version});
+  updateSupportedProperties({ObjectKey, Version});
   // Set the supported relationships
   setSupportedRelationships({Failure, Success});
 }
@@ -60,6 +65,15 @@ void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
     return;
   }
 
+  std::string object_key;
+  context->getProperty(ObjectKey, object_key, flow_file);
+  if (object_key.empty() && (!flow_file->getAttribute("filename", object_key) || object_key.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  logger_->log_debug("DeleteS3Object: Object Key [%s]", object_key);
+
   std::string version;
   context->getProperty(Version, version, flow_file);
   logger_->log_debug("DeleteS3Object: Version [%s]", version);
@@ -68,14 +82,14 @@ void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   {
     std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
     configureS3Wrapper(common_properties.value());
-    delete_succeeded = s3_wrapper_->deleteObject(common_properties->bucket, common_properties->object_key, version);
+    delete_succeeded = s3_wrapper_.deleteObject(common_properties->bucket, object_key, version);
   }
 
   if (delete_succeeded) {
-    logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", common_properties->object_key, common_properties->bucket);
+    logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", object_key, common_properties->bucket);
     session->transfer(flow_file, Success);
   } else {
-    logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", common_properties->object_key, common_properties->bucket);
+    logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", object_key, common_properties->bucket);
     session->transfer(flow_file, Failure);
   }
 }
diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h
index 6380e1f..d1ddaa5 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -43,14 +43,15 @@ class DeleteS3Object : public S3Processor {
   static constexpr char const* ProcessorName = "DeleteS3Object";
 
   // Supported Properties
+  static const core::Property ObjectKey;
   static const core::Property Version;
 
   // Supported Relationships
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
-  explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid = minifi::utils::Identifier())
-    : S3Processor(std::move(name), uuid, logging::LoggerFactory<DeleteS3Object>::getLogger()) {
+  explicit DeleteS3Object(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : S3Processor(name, uuid, logging::LoggerFactory<DeleteS3Object>::getLogger()) {
   }
 
   ~DeleteS3Object() override = default;
@@ -61,8 +62,8 @@ class DeleteS3Object : public S3Processor {
  private:
   friend class ::S3TestsFixture<DeleteS3Object>;
 
-  explicit DeleteS3Object(std::string name, minifi::utils::Identifier uuid, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-    : S3Processor(std::move(name), uuid, logging::LoggerFactory<DeleteS3Object>::getLogger(), std::move(s3_wrapper)) {
+  explicit DeleteS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+    : S3Processor(name, uuid, logging::LoggerFactory<DeleteS3Object>::getLogger(), std::move(s3_request_sender)) {
   }
 };
 
diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index 0871c54..8c77d53 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -30,6 +30,11 @@ namespace minifi {
 namespace aws {
 namespace processors {
 
+const core::Property FetchS3Object::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object. If none is given the filename attribute will be used by default.")
+    ->supportsExpressionLanguage(true)
+    ->build());
 const core::Property FetchS3Object::Version(
   core::PropertyBuilder::createProperty("Version")
     ->withDescription("The Version of the Object to download")
@@ -48,7 +53,7 @@ const core::Relationship FetchS3Object::Failure("failure", "FlowFiles are routed
 
 void FetchS3Object::initialize() {
   // Add new supported properties
-  updateSupportedProperties({Version, RequesterPays});
+  updateSupportedProperties({ObjectKey, Version, RequesterPays});
   // Set the supported relationships
   setSupportedRelationships({Failure, Success});
 }
@@ -76,13 +81,20 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
 
   minifi::aws::s3::GetObjectRequestParameters get_object_params;
   get_object_params.bucket = common_properties->bucket;
-  get_object_params.object_key = common_properties->object_key;
   get_object_params.requester_pays = requester_pays_;
 
+  context->getProperty(ObjectKey, get_object_params.object_key, flow_file);
+  if (get_object_params.object_key.empty() && (!flow_file->getAttribute("filename", get_object_params.object_key) || get_object_params.object_key.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  logger_->log_debug("FetchS3Object: Object Key [%s]", get_object_params.object_key);
+
   context->getProperty(Version, get_object_params.version, flow_file);
   logger_->log_debug("FetchS3Object: Version [%s]", get_object_params.version);
 
-  WriteCallback callback(flow_file->getSize(), get_object_params, s3_wrapper_.get());
+  WriteCallback callback(flow_file->getSize(), get_object_params, s3_wrapper_);
   {
     std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
     configureS3Wrapper(common_properties.value());
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 7387867..974be8c 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -44,6 +44,7 @@ class FetchS3Object : public S3Processor {
   static constexpr char const* ProcessorName = "FetchS3Object";
 
   // Supported Properties
+  static const core::Property ObjectKey;
   static const core::Property Version;
   static const core::Property RequesterPays;
 
@@ -51,7 +52,7 @@ class FetchS3Object : public S3Processor {
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
-  explicit FetchS3Object(std::string name, minifi::utils::Identifier uuid = minifi::utils::Identifier())
+  explicit FetchS3Object(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
     : S3Processor(name, uuid, logging::LoggerFactory<FetchS3Object>::getLogger()) {
   }
 
@@ -63,7 +64,7 @@ class FetchS3Object : public S3Processor {
 
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(uint64_t flow_size, const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3WrapperBase* s3_wrapper)
+    WriteCallback(uint64_t flow_size, const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
       : flow_size_(flow_size)
       , get_object_params_(get_object_params)
       , s3_wrapper_(s3_wrapper) {
@@ -71,7 +72,7 @@ class FetchS3Object : public S3Processor {
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       std::vector<uint8_t> buffer;
-      result_ = s3_wrapper_->getObject(get_object_params_, stream);
+      result_ = s3_wrapper_.getObject(get_object_params_, *stream);
       if (!result_) {
         return 0;
       }
@@ -82,7 +83,7 @@ class FetchS3Object : public S3Processor {
     uint64_t flow_size_;
 
     const minifi::aws::s3::GetObjectRequestParameters& get_object_params_;
-    aws::s3::S3WrapperBase* s3_wrapper_;
+    aws::s3::S3Wrapper& s3_wrapper_;
     uint64_t write_size_ = 0;
     minifi::utils::optional<minifi::aws::s3::GetObjectResult> result_ = minifi::utils::nullopt;
   };
@@ -90,8 +91,8 @@ class FetchS3Object : public S3Processor {
  private:
   friend class ::S3TestsFixture<FetchS3Object>;
 
-  explicit FetchS3Object(std::string name, minifi::utils::Identifier uuid, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-    : S3Processor(std::move(name), uuid, logging::LoggerFactory<FetchS3Object>::getLogger(), std::move(s3_wrapper)) {
+  explicit FetchS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+    : S3Processor(name, uuid, logging::LoggerFactory<FetchS3Object>::getLogger(), std::move(s3_request_sender)) {
   }
 
   bool requester_pays_ = false;
diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp
new file mode 100644
index 0000000..5a0b28c
--- /dev/null
+++ b/extensions/aws/processors/ListS3.cpp
@@ -0,0 +1,295 @@
+/**
+ * @file ListS3.cpp
+ * ListS3 class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListS3.h"
+
+#include <tuple>
+#include <algorithm>
+#include <set>
+#include <utility>
+#include <memory>
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+const std::string ListS3::LATEST_LISTED_KEY_PREFIX = "listed_key.";
+const std::string ListS3::LATEST_LISTED_KEY_TIMESTAMP = "listed_timestamp";
+
+const core::Property ListS3::Delimiter(
+  core::PropertyBuilder::createProperty("Delimiter")
+    ->withDescription("The string used to delimit directories within the bucket. Please consult the AWS documentation for the correct use of this field.")
+    ->build());
+const core::Property ListS3::Prefix(
+  core::PropertyBuilder::createProperty("Prefix")
+    ->withDescription("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
+    ->build());
+const core::Property ListS3::UseVersions(
+  core::PropertyBuilder::createProperty("Use Versions")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->withDescription("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.")
+    ->build());
+const core::Property ListS3::MinimumObjectAge(
+  core::PropertyBuilder::createProperty("Minimum Object Age")
+    ->isRequired(true)
+    ->withDefaultValue<core::TimePeriodValue>("0 sec")
+    ->withDescription("The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored.")
+    ->build());
+const core::Property ListS3::WriteObjectTags(
+  core::PropertyBuilder::createProperty("Write Object Tags")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->withDescription("If set to 'true', the tags associated with the S3 object will be written as FlowFile attributes.")
+    ->build());
+const core::Property ListS3::WriteUserMetadata(
+  core::PropertyBuilder::createProperty("Write User Metadata")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->withDescription("If set to 'true', the user defined metadata associated with the S3 object will be added to FlowFile attributes/records.")
+    ->build());
+const core::Property ListS3::RequesterPays(
+  core::PropertyBuilder::createProperty("Requester Pays")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->withDescription("If true, indicates that the requester consents to pay any charges associated with listing the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. "
+                      "Note that this setting is only used if Write User Metadata is true.")
+    ->build());
+
+const core::Relationship ListS3::Success("success", "FlowFiles are routed to success relationship");
+
+void ListS3::initialize() {
+  // Add new supported properties
+  updateSupportedProperties({Delimiter, Prefix, UseVersions, MinimumObjectAge, WriteObjectTags, WriteUserMetadata, RequesterPays});
+  // Set the supported relationships
+  setSupportedRelationships({Success});
+}
+
+void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  S3Processor::onSchedule(context, sessionFactory);
+
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  auto common_properties = getCommonELSupportedProperties(context, nullptr);
+  if (!common_properties) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid");
+  }
+  configureS3Wrapper(common_properties.value());
+  list_request_params_.bucket = common_properties->bucket;
+
+  context->getProperty(Delimiter.getName(), list_request_params_.delimiter);
+  logger_->log_debug("ListS3: Delimiter [%s]", list_request_params_.delimiter);
+
+  context->getProperty(Prefix.getName(), list_request_params_.prefix);
+  logger_->log_debug("ListS3: Prefix [%s]", list_request_params_.prefix);
+
+  context->getProperty(UseVersions.getName(), list_request_params_.use_versions);
+  logger_->log_debug("ListS3: UseVersions [%s]", list_request_params_.use_versions ? "true" : "false");
+
+  std::string min_obj_age_str;
+  if (!context->getProperty(MinimumObjectAge.getName(), min_obj_age_str) || min_obj_age_str.empty() || !core::Property::getTimeMSFromString(min_obj_age_str, list_request_params_.min_object_age)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Minimum Object Age missing or invalid");
+  }
+  logger_->log_debug("S3Processor: Minimum Object Age [%llud]", min_obj_age_str, list_request_params_.min_object_age);
+
+  context->getProperty(WriteObjectTags.getName(), write_object_tags_);
+  logger_->log_debug("ListS3: WriteObjectTags [%s]", write_object_tags_ ? "true" : "false");
+
+  context->getProperty(WriteUserMetadata.getName(), write_user_metadata_);
+  logger_->log_debug("ListS3: WriteUserMetadata [%s]", write_user_metadata_ ? "true" : "false");
+
+  context->getProperty(RequesterPays.getName(), requester_pays_);
+  logger_->log_debug("ListS3: RequesterPays [%s]", requester_pays_ ? "true" : "false");
+}
+
+void ListS3::writeObjectTags(
+    const std::string &bucket,
+    const aws::s3::ListedObjectAttributes &object_attributes,
+    core::ProcessSession &session,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  if (!write_object_tags_) {
+    return;
+  }
+
+  auto get_object_tags_result = s3_wrapper_.getObjectTags(bucket, object_attributes.filename, object_attributes.version);
+  if (get_object_tags_result) {
+    for (const auto& tag : get_object_tags_result.value()) {
+      session.putAttribute(flow_file, "s3.tag." + tag.first, tag.second);
+    }
+  } else {
+    logger_->log_warn("Failed to get object tags for object %s in bucket %s", object_attributes.filename, bucket);
+  }
+}
+
+void ListS3::writeUserMetadata(
+    const aws::s3::ListedObjectAttributes &object_attributes,
+    core::ProcessSession &session,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  if (!write_user_metadata_) {
+    return;
+  }
+
+  aws::s3::HeadObjectRequestParameters params;
+  params.bucket = list_request_params_.bucket;
+  params.object_key = object_attributes.filename;
+  params.version = object_attributes.version;
+  params.requester_pays = requester_pays_;
+  auto head_object_tags_result = s3_wrapper_.headObject(params);
+  if (head_object_tags_result) {
+    for (const auto& metadata : head_object_tags_result->user_metadata_map) {
+      session.putAttribute(flow_file, "s3.user.metadata." + metadata.first, metadata.second);
+    }
+  } else {
+    logger_->log_warn("Failed to get object metadata for object %s in bucket %s", params.object_key, params.bucket);
+  }
+}
+
+std::vector<std::string> ListS3::getLatestListedKeys(const std::unordered_map<std::string, std::string> &state) {
+  std::vector<std::string> latest_listed_keys;
+  for (const auto& kvp : state) {
+    if (kvp.first.rfind(LATEST_LISTED_KEY_PREFIX, 0) == 0) {
+      latest_listed_keys.push_back(kvp.second);
+    }
+  }
+  return latest_listed_keys;
+}
+
+uint64_t ListS3::getLatestListedKeyTimestamp(const std::unordered_map<std::string, std::string> &state) {
+  std::string stored_listed_key_timestamp_str;
+  auto it = state.find(LATEST_LISTED_KEY_TIMESTAMP);
+  if (it != state.end()) {
+    stored_listed_key_timestamp_str = it->second;
+  }
+
+  int64_t stored_listed_key_timestamp = 0;
+  core::Property::StringToInt(stored_listed_key_timestamp_str, stored_listed_key_timestamp);
+
+  return stored_listed_key_timestamp;
+}
+
+ListS3::ListingState ListS3::getCurrentState(const std::shared_ptr<core::ProcessContext> &context) {
+  ListS3::ListingState current_listing_state;
+  std::unordered_map<std::string, std::string> state;
+  if (!state_manager_->get(state)) {
+    logger_->log_info("No stored state for listed objects was found");
+    return current_listing_state;
+  }
+
+  current_listing_state.listed_key_timestamp = getLatestListedKeyTimestamp(state);
+  logger_->log_debug("Restored previous listed timestamp %lld", current_listing_state.listed_key_timestamp);
+
+  current_listing_state.listed_keys = getLatestListedKeys(state);
+  return current_listing_state;
+}
+
+void ListS3::storeState(const ListS3::ListingState &latest_listing_state) {
+  std::unordered_map<std::string, std::string> state;
+  state[LATEST_LISTED_KEY_TIMESTAMP] = std::to_string(latest_listing_state.listed_key_timestamp);
+  for (std::size_t i = 0; i < latest_listing_state.listed_keys.size(); ++i) {
+    state[LATEST_LISTED_KEY_PREFIX + std::to_string(i)] = latest_listing_state.listed_keys.at(i);
+  }
+  logger_->log_debug("Stored new listed timestamp %lld", latest_listing_state.listed_key_timestamp);
+  state_manager_->set(state);
+}
+
+void ListS3::createNewFlowFile(
+    core::ProcessSession &session,
+    const aws::s3::ListedObjectAttributes &object_attributes) {
+  auto flow_file = session.create();
+  session.putAttribute(flow_file, "s3.bucket", list_request_params_.bucket);
+  session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, object_attributes.filename);
+  session.putAttribute(flow_file, "s3.etag", object_attributes.etag);
+  session.putAttribute(flow_file, "s3.isLatest", object_attributes.is_latest ? "true" : "false");
+  session.putAttribute(flow_file, "s3.lastModified", std::to_string(object_attributes.last_modified));
+  session.putAttribute(flow_file, "s3.length", std::to_string(object_attributes.length));
+  session.putAttribute(flow_file, "s3.storeClass", object_attributes.store_class);
+  if (!object_attributes.version.empty()) {
+    session.putAttribute(flow_file, "s3.version", object_attributes.version);
+  }
+  writeObjectTags(list_request_params_.bucket, object_attributes, session, flow_file);
+  writeUserMetadata(object_attributes, session, flow_file);
+
+  session.transfer(flow_file, Success);
+}
+
+void ListS3::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  logger_->log_debug("ListS3 onTrigger");
+
+  auto aws_results = s3_wrapper_.listBucket(list_request_params_);
+  if (!aws_results) {
+    logger_->log_error("Failed to list S3 bucket %s", list_request_params_.bucket);
+    context->yield();
+    return;
+  }
+
+  auto stored_listing_state = getCurrentState(context);
+  auto latest_listing_state = stored_listing_state;
+  std::size_t files_transferred = 0;
+
+  for (const auto& object_attributes : aws_results.value()) {
+    if (stored_listing_state.wasObjectListedAlready(object_attributes)) {
+      continue;
+    }
+
+    createNewFlowFile(*session, object_attributes);
+    ++files_transferred;
+    latest_listing_state.updateState(object_attributes);
+  }
+
+  logger_->log_debug("ListS3 transferred %zu flow files", files_transferred);
+  storeState(latest_listing_state);
+
+  if (files_transferred == 0) {
+    logger_->log_debug("No new S3 objects were found in bucket %s to list", list_request_params_.bucket);
+    context->yield();
+    return;
+  }
+}
+
+bool ListS3::ListingState::wasObjectListedAlready(const aws::s3::ListedObjectAttributes &object_attributes) const {
+  return listed_key_timestamp > object_attributes.last_modified ||
+      (listed_key_timestamp == object_attributes.last_modified &&
+        std::find(listed_keys.begin(), listed_keys.end(), object_attributes.filename) != listed_keys.end());
+}
+
+void ListS3::ListingState::updateState(const aws::s3::ListedObjectAttributes &object_attributes) {
+  if (listed_key_timestamp < object_attributes.last_modified) {
+    listed_key_timestamp = object_attributes.last_modified;
+    listed_keys.clear();
+    listed_keys.push_back(object_attributes.filename);
+  } else if (listed_key_timestamp == object_attributes.last_modified) {
+    listed_keys.push_back(object_attributes.filename);
+  }
+}
+
+}  // namespace processors
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/processors/ListS3.h b/extensions/aws/processors/ListS3.h
new file mode 100644
index 0000000..699b3e3
--- /dev/null
+++ b/extensions/aws/processors/ListS3.h
@@ -0,0 +1,112 @@
+/**
+ * @file ListS3.h
+ * ListS3 class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+#include <tuple>
+#include <memory>
+
+#include "S3Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace processors {
+
+class ListS3 : public S3Processor {
+ public:
+  static constexpr char const* ProcessorName = "ListS3";
+  static const std::string LATEST_LISTED_KEY_PREFIX;
+  static const std::string LATEST_LISTED_KEY_TIMESTAMP;
+
+  // Supported Properties
+  static const core::Property Delimiter;
+  static const core::Property Prefix;
+  static const core::Property UseVersions;
+  static const core::Property MinimumObjectAge;
+  static const core::Property WriteObjectTags;
+  static const core::Property WriteUserMetadata;
+  static const core::Property RequesterPays;
+
+  // Supported Relationships
+  static const core::Relationship Success;
+
+  explicit ListS3(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : S3Processor(name, uuid, logging::LoggerFactory<ListS3>::getLogger()) {
+  }
+
+  explicit ListS3(const std::string& name, minifi::utils::Identifier uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+    : S3Processor(name, uuid, logging::LoggerFactory<ListS3>::getLogger(), std::move(s3_request_sender)) {
+  }
+
+  ~ListS3() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  struct ListingState {
+    int64_t listed_key_timestamp = 0;
+    std::vector<std::string> listed_keys;
+
+    bool wasObjectListedAlready(const aws::s3::ListedObjectAttributes &object_attributes) const;
+    void updateState(const aws::s3::ListedObjectAttributes &object_attributes);
+  };
+
+  static std::vector<std::string> getLatestListedKeys(const std::unordered_map<std::string, std::string> &state);
+  static uint64_t getLatestListedKeyTimestamp(const std::unordered_map<std::string, std::string> &state);
+
+  void writeObjectTags(
+    const std::string &bucket,
+    const aws::s3::ListedObjectAttributes &object_attributes,
+    core::ProcessSession &session,
+    const std::shared_ptr<core::FlowFile> &flow_file);
+  void writeUserMetadata(
+    const aws::s3::ListedObjectAttributes &object_attributes,
+    core::ProcessSession &session,
+    const std::shared_ptr<core::FlowFile> &flow_file);
+  ListingState getCurrentState(const std::shared_ptr<core::ProcessContext> &context);
+  void storeState(const ListingState &latest_listing_state);
+  void createNewFlowFile(
+    core::ProcessSession &session,
+    const aws::s3::ListedObjectAttributes &object_attributes);
+
+  aws::s3::ListRequestParameters list_request_params_;
+  bool write_object_tags_ = false;
+  bool write_user_metadata_ = false;
+  bool requester_pays_ = false;
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_ = nullptr;
+};
+
+REGISTER_RESOURCE(ListS3, "This Processor retrieves a listing of objects from an Amazon S3 bucket.");
+
+}  // namespace processors
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp
index 1dee1ed..b05f987 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -44,6 +44,11 @@ const std::set<std::string> PutS3Object::CANNED_ACLS(minifi::utils::MapUtils::ge
 const std::set<std::string> PutS3Object::STORAGE_CLASSES(minifi::utils::MapUtils::getKeys(minifi::aws::s3::STORAGE_CLASS_MAP));
 const std::set<std::string> PutS3Object::SERVER_SIDE_ENCRYPTIONS(minifi::utils::MapUtils::getKeys(minifi::aws::s3::SERVER_SIDE_ENCRYPTION_MAP));
 
+const core::Property PutS3Object::ObjectKey(
+  core::PropertyBuilder::createProperty("Object Key")
+    ->withDescription("The key of the S3 object. If none is given the filename attribute will be used by default.")
+    ->supportsExpressionLanguage(true)
+    ->build());
 const core::Property PutS3Object::ContentType(
   core::PropertyBuilder::createProperty("Content Type")
     ->withDescription("Sets the Content-Type HTTP header indicating the type of content stored in "
@@ -101,7 +106,7 @@ const core::Relationship PutS3Object::Failure("failure", "FlowFiles are routed t
 
 void PutS3Object::initialize() {
   // Add new supported properties
-  updateSupportedProperties({ContentType, StorageClass, FullControlUserList, ReadPermissionUserList,
+  updateSupportedProperties({ObjectKey, ContentType, StorageClass, FullControlUserList, ReadPermissionUserList,
     ReadACLUserList, WriteACLUserList, CannedACL, ServerSideEncryption});
   // Set the supported relationships
   setSupportedRelationships({Failure, Success});
@@ -202,12 +207,18 @@ minifi::utils::optional<aws::s3::PutObjectRequestParameters> PutS3Object::buildP
     const std::shared_ptr<core::FlowFile> &flow_file,
     const CommonProperties &common_properties) {
   aws::s3::PutObjectRequestParameters params;
-  params.object_key = common_properties.object_key;
   params.bucket = common_properties.bucket;
   params.user_metadata_map = user_metadata_map_;
   params.server_side_encryption = server_side_encryption_;
   params.storage_class = storage_class_;
 
+  context->getProperty(ObjectKey, params.object_key, flow_file);
+  if (params.object_key.empty() && (!flow_file->getAttribute("filename", params.object_key) || params.object_key.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+    return minifi::utils::nullopt;
+  }
+  logger_->log_debug("PutS3Object: Object Key [%s]", params.object_key);
+
   context->getProperty(ContentType, params.content_type, flow_file);
   logger_->log_debug("PutS3Object: Content Type [%s]", params.content_type);
 
@@ -235,8 +246,8 @@ void PutS3Object::setAttributes(
   if (!put_object_result.etag.empty()) {
     session->putAttribute(flow_file, "s3.etag", put_object_result.etag);
   }
-  if (!put_object_result.expiration_time.empty()) {
-    session->putAttribute(flow_file, "s3.expiration", put_object_result.expiration_time);
+  if (!put_object_result.expiration.empty()) {
+    session->putAttribute(flow_file, "s3.expiration", put_object_result.expiration);
   }
   if (!put_object_result.ssealgorithm.empty()) {
     session->putAttribute(flow_file, "s3.sseAlgorithm", put_object_result.ssealgorithm);
@@ -263,7 +274,7 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), put_s3_request_params.value(), s3_wrapper_.get());
+  PutS3Object::ReadCallback callback(flow_file->getSize(), put_s3_request_params.value(), s3_wrapper_);
   {
     std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
     configureS3Wrapper(common_properties.value());
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 1251153..4198a5a 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -51,6 +51,7 @@ class PutS3Object : public S3Processor {
   static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
 
   // Supported Properties
+  static const core::Property ObjectKey;
   static const core::Property ContentType;
   static const core::Property StorageClass;
   static const core::Property ServerSideEncryption;
@@ -64,8 +65,8 @@ class PutS3Object : public S3Processor {
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
-  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid = minifi::utils::Identifier())
-    : S3Processor(std::move(name), uuid, logging::LoggerFactory<PutS3Object>::getLogger()) {
+  explicit PutS3Object(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : S3Processor(name, uuid, logging::LoggerFactory<PutS3Object>::getLogger()) {
   }
 
   ~PutS3Object() override = default;
@@ -79,7 +80,7 @@ class PutS3Object : public S3Processor {
     static const uint64_t MAX_SIZE;
     static const uint64_t BUFFER_SIZE;
 
-    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3WrapperBase* s3_wrapper)
+    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper)
       : flow_size_(flow_size)
       , options_(options)
       , s3_wrapper_(s3_wrapper) {
@@ -106,13 +107,13 @@ class PutS3Object : public S3Processor {
           break;
         }
       }
-      result_ = s3_wrapper_->putObject(options_, data_stream);
+      result_ = s3_wrapper_.putObject(options_, data_stream);
       return read_size_;
     }
 
     uint64_t flow_size_;
     const minifi::aws::s3::PutObjectRequestParameters& options_;
-    aws::s3::S3WrapperBase* s3_wrapper_;
+    aws::s3::S3Wrapper& s3_wrapper_;
     uint64_t read_size_ = 0;
     minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ = minifi::utils::nullopt;
   };
@@ -120,8 +121,8 @@ class PutS3Object : public S3Processor {
  private:
   friend class ::S3TestsFixture<PutS3Object>;
 
-  explicit PutS3Object(std::string name, minifi::utils::Identifier uuid, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-    : S3Processor(std::move(name), uuid, logging::LoggerFactory<PutS3Object>::getLogger(), std::move(s3_wrapper)) {
+  explicit PutS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+    : S3Processor(name, uuid, logging::LoggerFactory<PutS3Object>::getLogger(), std::move(s3_request_sender)) {
   }
 
   void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp
index 437c64e..993e031 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -42,11 +42,6 @@ const std::set<std::string> S3Processor::REGIONS({region::AF_SOUTH_1, region::AP
   region::EU_SOUTH_1, region::EU_WEST_1, region::EU_WEST_2, region::EU_WEST_3, region::ME_SOUTH_1, region::SA_EAST_1,
   region::US_EAST_1, region::US_EAST_2, region::US_GOV_EAST_1, region::US_GOV_WEST_1, region::US_WEST_1, region::US_WEST_2});
 
-const core::Property S3Processor::ObjectKey(
-  core::PropertyBuilder::createProperty("Object Key")
-    ->withDescription("The key of the S3 object. If none is given the filename attribute will be used by default.")
-    ->supportsExpressionLanguage(true)
-    ->build());
 const core::Property S3Processor::Bucket(
   core::PropertyBuilder::createProperty("Bucket")
     ->withDescription("The S3 bucket")
@@ -119,19 +114,18 @@ const core::Property S3Processor::UseDefaultCredentials(
     ->isRequired(true)
     ->build());
 
-S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid, const std::shared_ptr<logging::Logger> &logger)
-  : core::Processor(std::move(name), uuid)
-  , logger_(logger)
-  , s3_wrapper_(minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
-  setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout,
+S3Processor::S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger)
+  : core::Processor(name, uuid)
+  , logger_(logger) {
+  setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout,
                           EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials});
 }
 
-S3Processor::S3Processor(std::string name, minifi::utils::Identifier uuid, const std::shared_ptr<logging::Logger> &logger, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
-  : core::Processor(std::move(name), uuid)
+S3Processor::S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
+  : core::Processor(name, uuid)
   , logger_(logger)
-  , s3_wrapper_(std::move(s3_wrapper)) {
-  setSupportedProperties({ObjectKey, Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout,
+  , s3_wrapper_(std::move(s3_request_sender)) {
+  setSupportedProperties({Bucket, AccessKey, SecretKey, CredentialsFile, CredentialsFile, AWSCredentialsProviderService, Region, CommunicationsTimeout,
                           EndpointOverrideURL, ProxyHost, ProxyPort, ProxyUsername, ProxyPassword, UseDefaultCredentials});
 }
 
@@ -205,13 +199,13 @@ void S3Processor::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
   if (!context->getProperty(Region.getName(), value) || value.empty() || REGIONS.count(value) == 0) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or invalid");
   }
-  s3_wrapper_->setRegion(value);
+  s3_wrapper_.setRegion(value);
   logger_->log_debug("S3Processor: Region [%s]", value);
 
   uint64_t timeout_val;
   if (context->getProperty(CommunicationsTimeout.getName(), value) && !value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
-    s3_wrapper_->setTimeout(timeout_val);
-    logger_->log_debug("S3Processor: Communications Timeout [%d]", timeout_val);
+    s3_wrapper_.setTimeout(timeout_val);
+    logger_->log_debug("S3Processor: Communications Timeout [%llu]", timeout_val);
   } else {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
   }
@@ -221,13 +215,6 @@ minifi::utils::optional<CommonProperties> S3Processor::getCommonELSupportedPrope
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) {
   CommonProperties properties;
-  context->getProperty(ObjectKey, properties.object_key, flow_file);
-  if (properties.object_key.empty() && (!flow_file->getAttribute("filename", properties.object_key) || properties.object_key.empty())) {
-    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
-    return minifi::utils::nullopt;
-  }
-  logger_->log_debug("S3Processor: Object Key [%s]", properties.object_key);
-
   if (!context->getProperty(Bucket, properties.bucket, flow_file) || properties.bucket.empty()) {
     logger_->log_error("Bucket '%s' is invalid or empty!", properties.bucket);
     return minifi::utils::nullopt;
@@ -249,19 +236,19 @@ minifi::utils::optional<CommonProperties> S3Processor::getCommonELSupportedPrope
 
   context->getProperty(EndpointOverrideURL, properties.endpoint_override_url, flow_file);
   if (!properties.endpoint_override_url.empty()) {
-    logger_->log_debug("S3Processor: Endpoint Override URL [%d]", properties.endpoint_override_url);
+    logger_->log_debug("S3Processor: Endpoint Override URL [%s]", properties.endpoint_override_url);
   }
 
   return properties;
 }
 
 void S3Processor::configureS3Wrapper(const CommonProperties &common_properties) {
-  s3_wrapper_->setCredentials(common_properties.credentials);
+  s3_wrapper_.setCredentials(common_properties.credentials);
   if (!common_properties.proxy.host.empty()) {
-    s3_wrapper_->setProxy(common_properties.proxy);
+    s3_wrapper_.setProxy(common_properties.proxy);
   }
   if (!common_properties.endpoint_override_url.empty()) {
-    s3_wrapper_->setEndpointOverrideUrl(common_properties.endpoint_override_url);
+    s3_wrapper_.setEndpointOverrideUrl(common_properties.endpoint_override_url);
   }
 }
 
diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/S3Processor.h
index 1eff311..0b76078 100644
--- a/extensions/aws/processors/S3Processor.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -27,7 +27,7 @@
 
 #include "aws/core/auth/AWSCredentialsProvider.h"
 
-#include "S3WrapperBase.h"
+#include "S3Wrapper.h"
 #include "AWSCredentialsProvider.h"
 #include "core/Property.h"
 #include "core/Processor.h"
@@ -85,7 +85,6 @@ class S3Processor : public core::Processor {
   static const std::set<std::string> REGIONS;
 
   // Supported Properties
-  static const core::Property ObjectKey;
   static const core::Property Bucket;
   static const core::Property AccessKey;
   static const core::Property SecretKey;
@@ -100,13 +99,13 @@ class S3Processor : public core::Processor {
   static const core::Property ProxyPassword;
   static const core::Property UseDefaultCredentials;
 
-  explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const std::shared_ptr<logging::Logger> &logger);
+  explicit S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger);
 
   bool supportsDynamicProperties() override { return true; }
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  protected:
-  explicit S3Processor(std::string name, minifi::utils::Identifier uuid, const std::shared_ptr<logging::Logger> &logger, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper);
+  explicit S3Processor(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<logging::Logger> &logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender);
 
   minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
   minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
@@ -115,7 +114,7 @@ class S3Processor : public core::Processor {
   void configureS3Wrapper(const CommonProperties &common_properties);
 
   std::shared_ptr<logging::Logger> logger_;
-  std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
+  aws::s3::S3Wrapper s3_wrapper_;
   std::mutex s3_wrapper_mutex_;
 };
 
diff --git a/extensions/aws/s3/S3ClientRequestSender.cpp b/extensions/aws/s3/S3ClientRequestSender.cpp
new file mode 100644
index 0000000..62cbf50
--- /dev/null
+++ b/extensions/aws/s3/S3ClientRequestSender.cpp
@@ -0,0 +1,130 @@
+/**
+ * @file S3ClientRequestSender.cpp
+ * S3ClientRequestSender class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "S3ClientRequestSender.h"
+
+#include <aws/s3/S3Client.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace s3 {
+
+minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.PutObject(request);
+
+  if (outcome.IsSuccess()) {
+      logger_->log_debug("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket());
+      return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("PutS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+bool S3ClientRequestSender::sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("Deleted S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
+    return true;
+  } else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
+    logger_->log_debug("S3 object '%s' was not found in bucket '%s'", request.GetKey(), request.GetBucket());
+    return true;
+  } else {
+    logger_->log_error("DeleteS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+    return false;
+  }
+}
+
+minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.GetObject(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("Fetched S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
+    return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("FetchS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSender::sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.ListObjectsV2(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("ListObjectsV2 successful of bucket '%s'", request.GetBucket());
+    return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("ListObjectsV2 failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientRequestSender::sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.ListObjectVersions(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("ListObjectVersions successful of bucket '%s'", request.GetBucket());
+    return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("ListObjectVersions failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestSender::sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.GetObjectTagging(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("Got tags for S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
+    return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("GetObjectTagging failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+minifi::utils::optional<Aws::S3::Model::HeadObjectResult> S3ClientRequestSender::sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) {
+  Aws::S3::S3Client s3_client(credentials_, client_config_);
+  auto outcome = s3_client.HeadObject(request);
+
+  if (outcome.IsSuccess()) {
+    logger_->log_debug("HeadS3Object successful for key '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
+    return outcome.GetResultWithOwnership();
+  } else {
+    logger_->log_error("HeadS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+    return minifi::utils::nullopt;
+  }
+}
+
+}  // namespace s3
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3ClientRequestSender.h
similarity index 67%
copy from extensions/aws/s3/S3Wrapper.h
copy to extensions/aws/s3/S3ClientRequestSender.h
index cdcbe9b..be9dd0c 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3ClientRequestSender.h
@@ -19,10 +19,7 @@
  */
 #pragma once
 
-#include <memory>
-
-#include "aws/s3/model/PutObjectResult.h"
-#include "S3WrapperBase.h"
+#include "S3RequestSender.h"
 
 namespace org {
 namespace apache {
@@ -31,11 +28,15 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-class S3Wrapper : public S3WrapperBase {
- protected:
+class S3ClientRequestSender : public S3RequestSender {
+ public:
   minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override;
   bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override;
   minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override;
+  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) override;
+  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) override;
+  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) override;
+  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) override;
 };
 
 }  // namespace s3
diff --git a/extensions/aws/s3/S3RequestSender.cpp b/extensions/aws/s3/S3RequestSender.cpp
new file mode 100644
index 0000000..859a317
--- /dev/null
+++ b/extensions/aws/s3/S3RequestSender.cpp
@@ -0,0 +1,63 @@
+/**
+ * @file S3RequestSender.cpp
+ * S3RequestSender class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "S3RequestSender.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace s3 {
+
+
+void S3RequestSender::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  logger_->log_debug("Setting new AWS credentials");
+  credentials_ = cred;
+}
+
+void S3RequestSender::setRegion(const Aws::String& region) {
+  logger_->log_debug("Setting new AWS region [%s]", region);
+  client_config_.region = region;
+}
+
+void S3RequestSender::setTimeout(uint64_t timeout) {
+  logger_->log_debug("Setting AWS client connection timeout [%llu]", timeout);
+  client_config_.connectTimeoutMs = timeout;
+}
+
+void S3RequestSender::setEndpointOverrideUrl(const Aws::String& url) {
+  logger_->log_debug("Setting AWS endpoint url [%s]", url);
+  client_config_.endpointOverride = url;
+}
+
+void S3RequestSender::setProxy(const ProxyOptions& proxy) {
+  logger_->log_debug("Setting AWS client proxy host [%s] port [%lu]", proxy.host, proxy.port);
+  client_config_.proxyHost = proxy.host;
+  client_config_.proxyPort = proxy.port;
+  client_config_.proxyUserName = proxy.username;
+  client_config_.proxyPassword = proxy.password;
+}
+
+}  // namespace s3
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/s3/S3RequestSender.h b/extensions/aws/s3/S3RequestSender.h
new file mode 100644
index 0000000..d55be76
--- /dev/null
+++ b/extensions/aws/s3/S3RequestSender.h
@@ -0,0 +1,89 @@
+/**
+ * @file S3RequestSender.h
+ * S3RequestSender class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "aws/core/auth/AWSCredentials.h"
+#include "aws/core/client/ClientConfiguration.h"
+#include "aws/core/utils/xml/XmlSerializer.h"
+#include "aws/s3/model/PutObjectRequest.h"
+#include "aws/s3/model/PutObjectResult.h"
+#include "aws/s3/model/DeleteObjectRequest.h"
+#include "aws/s3/model/GetObjectRequest.h"
+#include "aws/s3/model/GetObjectResult.h"
+#include "aws/s3/model/ListObjectsV2Request.h"
+#include "aws/s3/model/ListObjectsV2Result.h"
+#include "aws/s3/model/ListObjectVersionsRequest.h"
+#include "aws/s3/model/ListObjectVersionsResult.h"
+#include "aws/s3/model/GetObjectTaggingRequest.h"
+#include "aws/s3/model/GetObjectTaggingResult.h"
+#include "aws/s3/model/HeadObjectRequest.h"
+#include "aws/s3/model/HeadObjectResult.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "utils/AWSInitializer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace aws {
+namespace s3 {
+
+struct ProxyOptions {
+  std::string host;
+  uint32_t port = 0;
+  std::string username;
+  std::string password;
+};
+
+class S3RequestSender {
+ public:
+  virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) = 0;
+  virtual bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) = 0;
+  virtual ~S3RequestSender() = default;
+
+  void setCredentials(const Aws::Auth::AWSCredentials& cred);
+  void setRegion(const Aws::String& region);
+  void setTimeout(uint64_t timeout);
+  void setEndpointOverrideUrl(const Aws::String& url);
+  void setProxy(const ProxyOptions& proxy);
+
+ protected:
+  const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
+  Aws::Client::ClientConfiguration client_config_;
+  Aws::Auth::AWSCredentials credentials_;
+  std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3RequestSender>::getLogger()};
+};
+
+}  // namespace s3
+}  // namespace aws
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index aa7af8f..efd6d38 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -19,9 +19,16 @@
  */
 #include "S3Wrapper.h"
 
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/Bucket.h>
-#include <aws/s3/model/StorageClass.h>
+#include <memory>
+#include <regex>
+#include <utility>
+#include <vector>
+
+#include "utils/GeneralUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "utils/RegexUtils.h"
+#include "S3ClientRequestSender.h"
 
 namespace org {
 namespace apache {
@@ -30,46 +37,291 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique<S3ClientRequestSender>()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender) : request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end())
+    return;
 
-  if (outcome.IsSuccess()) {
-      logger_->log_info("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket());
-      return outcome.GetResultWithOwnership();
-  } else {
-    logger_->log_error("PutS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+    return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) {
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+    return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(),
+    [&](const std::pair<std::string, const Aws::S3::Model::ServerSideEncryption>& pair) {
+      return pair.second == encryption;
+    });
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+    return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
     return minifi::utils::nullopt;
   }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
 }
 
-bool S3Wrapper::sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output) {
+  static const int64_t BUFFER_SIZE = 4096;
+  std::vector<uint8_t> buffer;
+  buffer.resize(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-    logger_->log_info("Deleted S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
-    return true;
-  } else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
-    logger_->log_info("S3 object '%s' was not found in bucket '%s'", request.GetKey(), request.GetBucket());
-    return true;
-  } else {
-    logger_->log_error("DeleteS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
-    return false;
+  int64_t write_size = 0;
+  while (write_size < data_size) {
+    auto next_write_size = (std::min)(data_size - write_size, BUFFER_SIZE);
+    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) {
+      return -1;
+    }
+    auto ret = output.write(buffer.data(), next_write_size);
+    if (ret < 0) {
+      return ret;
+    }
+    write_size += next_write_size;
   }
+  return write_size;
 }
 
-minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3Wrapper::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.GetObject(request);
+minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) {
+  auto request = createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
+  auto aws_result = request_sender_->sendGetObjectRequest(request);
+  if (!aws_result) {
+    return minifi::utils::nullopt;
+  }
+  auto result = fillFetchObjectResult<Aws::S3::Model::GetObjectResult, GetObjectResult>(get_object_params, *aws_result);
+  result.write_size = writeFetchedBody(aws_result->GetBody(), aws_result->GetContentLength(), out_body);
+  return result;
+}
+
+void S3Wrapper::addListResults(const Aws::Vector<Aws::S3::Model::ObjectVersion>& content, const uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects) {
+  for (const auto& version : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < version.GetLastModified().Millis()) {
+      logger_->log_debug("Object version '%s' of key '%s' skipped due to minimum object age filter", version.GetVersionId(), version.GetKey());
+      continue;
+    }
+
+    ListedObjectAttributes attributes;
+    attributes.etag = minifi::utils::StringUtils::removeFramingCharacters(version.GetETag(), '"');
+    attributes.filename = version.GetKey();
+    attributes.is_latest = version.GetIsLatest();
+    attributes.last_modified = version.GetLastModified().Millis();
+    attributes.length = version.GetSize();
+    attributes.store_class = VERSION_STORAGE_CLASS_MAP.at(version.GetStorageClass());
+    attributes.version = version.GetVersionId();
+    listed_objects.push_back(attributes);
+  }
+}
+
+void S3Wrapper::addListResults(const Aws::Vector<Aws::S3::Model::Object>& content, const uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects) {
+  for (const auto& object : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < object.GetLastModified().Millis()) {
+      logger_->log_debug("Object with key '%s' skipped due to minimum object age filter", object.GetKey());
+      continue;
+    }
+
+    ListedObjectAttributes attributes;
+    attributes.etag = minifi::utils::StringUtils::removeFramingCharacters(object.GetETag(), '"');
+    attributes.filename = object.GetKey();
+    attributes.is_latest = true;
+    attributes.last_modified = object.GetLastModified().Millis();
+    attributes.length = object.GetSize();
+    attributes.store_class = OBJECT_STORAGE_CLASS_MAP.at(object.GetStorageClass());
+    listed_objects.push_back(attributes);
+  }
+}
 
-  if (outcome.IsSuccess()) {
-    logger_->log_info("Fetched S3 object %s from bucket %s", request.GetKey(), request.GetBucket());
-    return outcome.GetResultWithOwnership();
-  } else {
-    logger_->log_error("FetchS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listVersions(const ListRequestParameters& params) {
+  auto request = createListRequest<Aws::S3::Model::ListObjectVersionsRequest>(params);
+  std::vector<ListedObjectAttributes> attribute_list;
+  nonstd::optional_lite::optional<Aws::S3::Model::ListObjectVersionsResult> aws_result;
+  do {
+    aws_result = request_sender_->sendListVersionsRequest(request);
+    if (!aws_result) {
+      return minifi::utils::nullopt;
+    }
+    const auto& versions = aws_result->GetVersions();
+    logger_->log_debug("AWS S3 List operation returned %zu versions. This result is%s truncated.", versions.size(), aws_result->GetIsTruncated() ? "" : " not");
+    addListResults(versions, params.min_object_age, attribute_list);
+    if (aws_result->GetIsTruncated()) {
+      request.SetKeyMarker(aws_result->GetNextKeyMarker());
+      request.SetVersionIdMarker(aws_result->GetNextVersionIdMarker());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return attribute_list;
+}
+
+minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listObjects(const ListRequestParameters& params) {
+  auto request = createListRequest<Aws::S3::Model::ListObjectsV2Request>(params);
+  std::vector<ListedObjectAttributes> attribute_list;
+  nonstd::optional_lite::optional<Aws::S3::Model::ListObjectsV2Result> aws_result;
+  do {
+    aws_result = request_sender_->sendListObjectsRequest(request);
+    if (!aws_result) {
+      return minifi::utils::nullopt;
+    }
+    const auto& objects = aws_result->GetContents();
+    logger_->log_debug("AWS S3 List operation returned %zu objects. This result is%s truncated.", objects.size(), aws_result->GetIsTruncated() ? "" : "not");
+    addListResults(objects, params.min_object_age, attribute_list);
+    if (aws_result->GetIsTruncated()) {
+      request.SetContinuationToken(aws_result->GetNextContinuationToken());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return attribute_list;
+}
+
+minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listBucket(const ListRequestParameters& params) {
+  last_bucket_list_timestamp_ = Aws::Utils::DateTime::CurrentTimeMillis();
+  if (params.use_versions) {
+    return listVersions(params);
+  }
+  return listObjects(params);
+}
+
+minifi::utils::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const std::string& bucket, const std::string& object_key, const std::string& version) {
+  Aws::S3::Model::GetObjectTaggingRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  auto aws_result = request_sender_->sendGetObjectTaggingRequest(request);
+  if (!aws_result) {
+    return minifi::utils::nullopt;
+  }
+  std::map<std::string, std::string> tags;
+  for (const auto& tag : aws_result->GetTagSet()) {
+    tags.emplace(tag.GetKey(), tag.GetValue());
+  }
+  return tags;
+}
+
+minifi::utils::optional<HeadObjectResult> S3Wrapper::headObject(const HeadObjectRequestParameters& head_object_params) {
+  auto request = createFetchObjectRequest<Aws::S3::Model::HeadObjectRequest>(head_object_params);
+  auto aws_result = request_sender_->sendHeadObjectRequest(request);
+  if (!aws_result) {
     return minifi::utils::nullopt;
   }
+  return fillFetchObjectResult<Aws::S3::Model::HeadObjectResult, HeadObjectResult>(head_object_params, aws_result.value());
+}
+
+template<typename ListRequest>
+ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
+  ListRequest request;
+  request.SetBucket(params.bucket);
+  request.SetDelimiter(params.delimiter);
+  request.SetPrefix(params.prefix);
+  return request;
+}
+
+template<typename FetchObjectRequest>
+FetchObjectRequest S3Wrapper::createFetchObjectRequest(const GetObjectRequestParameters& get_object_params) {
+  FetchObjectRequest request;
+  request.SetBucket(get_object_params.bucket);
+  request.SetKey(get_object_params.object_key);
+  if (!get_object_params.version.empty()) {
+    request.SetVersionId(get_object_params.version);
+  }
+  if (get_object_params.requester_pays) {
+    request.SetRequestPayer(Aws::S3::Model::RequestPayer::requester);
+  }
+  return request;
+}
+
+template<typename AwsResult, typename FetchObjectResult>
+FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParameters& get_object_params, const AwsResult& fetch_object_result) {
+  FetchObjectResult result;
+  result.setFilePaths(get_object_params.object_key);
+  result.mime_type = fetch_object_result.GetContentType();
+  result.etag = minifi::utils::StringUtils::removeFramingCharacters(fetch_object_result.GetETag(), '"');
+  result.expiration = getExpiration(fetch_object_result.GetExpiration());
+  result.ssealgorithm = getEncryptionString(fetch_object_result.GetServerSideEncryption());
+  result.version = fetch_object_result.GetVersionId();
+  for (const auto& metadata : fetch_object_result.GetMetadata()) {
+    result.user_metadata_map.emplace(metadata.first, metadata.second);
+  }
+  return result;
 }
 
 }  // namespace s3
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index cdcbe9b..239b3b2 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -17,12 +17,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #pragma once
 
+#include <string>
+#include <map>
+#include <unordered_map>
 #include <memory>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "aws/s3/model/StorageClass.h"
+#include "aws/s3/model/ServerSideEncryption.h"
+#include "aws/s3/model/ObjectCannedACL.h"
 
-#include "aws/s3/model/PutObjectResult.h"
-#include "S3WrapperBase.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/AWSInitializer.h"
+#include "utils/OptionalUtils.h"
+#include "utils/StringUtils.h"
+#include "io/BaseStream.h"
+#include "S3RequestSender.h"
 
 namespace org {
 namespace apache {
@@ -31,11 +47,162 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-class S3Wrapper : public S3WrapperBase {
- protected:
-  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override;
-  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override;
-  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override;
+static const std::unordered_map<std::string, Aws::S3::Model::StorageClass> STORAGE_CLASS_MAP {
+  {"Standard", Aws::S3::Model::StorageClass::STANDARD},
+  {"ReducedRedundancy", Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY},
+  {"StandardIA", Aws::S3::Model::StorageClass::STANDARD_IA},
+  {"OnezoneIA", Aws::S3::Model::StorageClass::ONEZONE_IA},
+  {"IntelligentTiering", Aws::S3::Model::StorageClass::INTELLIGENT_TIERING},
+  {"Glacier", Aws::S3::Model::StorageClass::GLACIER},
+  {"DeepArchive", Aws::S3::Model::StorageClass::DEEP_ARCHIVE}
+};
+
+static const std::map<Aws::S3::Model::ObjectStorageClass, std::string> OBJECT_STORAGE_CLASS_MAP {
+  {Aws::S3::Model::ObjectStorageClass::STANDARD, "Standard"},
+  {Aws::S3::Model::ObjectStorageClass::REDUCED_REDUNDANCY, "ReducedRedundancy"},
+  {Aws::S3::Model::ObjectStorageClass::STANDARD_IA, "StandardIA"},
+  {Aws::S3::Model::ObjectStorageClass::ONEZONE_IA, "OnezoneIA"},
+  {Aws::S3::Model::ObjectStorageClass::INTELLIGENT_TIERING, "IntelligentTiering"},
+  {Aws::S3::Model::ObjectStorageClass::GLACIER, "Glacier"},
+  {Aws::S3::Model::ObjectStorageClass::DEEP_ARCHIVE, "DeepArchive"}
+};
+
+static const std::map<Aws::S3::Model::ObjectVersionStorageClass, std::string> VERSION_STORAGE_CLASS_MAP {
+  {Aws::S3::Model::ObjectVersionStorageClass::STANDARD, "Standard"}
+};
+
+static const std::unordered_map<std::string, Aws::S3::Model::ServerSideEncryption> SERVER_SIDE_ENCRYPTION_MAP {
+  {"None", Aws::S3::Model::ServerSideEncryption::NOT_SET},
+  {"AES256", Aws::S3::Model::ServerSideEncryption::AES256},
+  {"aws_kms", Aws::S3::Model::ServerSideEncryption::aws_kms},
+};
+
+static const std::unordered_map<std::string, Aws::S3::Model::ObjectCannedACL> CANNED_ACL_MAP {
+  {"BucketOwnerFullControl", Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control},
+  {"BucketOwnerRead", Aws::S3::Model::ObjectCannedACL::bucket_owner_read},
+  {"AuthenticatedRead", Aws::S3::Model::ObjectCannedACL::authenticated_read},
+  {"PublicReadWrite", Aws::S3::Model::ObjectCannedACL::public_read_write},
+  {"PublicRead", Aws::S3::Model::ObjectCannedACL::public_read},
+  {"Private", Aws::S3::Model::ObjectCannedACL::private_},
+  {"AwsExecRead", Aws::S3::Model::ObjectCannedACL::aws_exec_read},
+};
+
+struct Expiration {
+  std::string expiration_time;
+  std::string expiration_time_rule_id;
+};
+
+struct PutObjectResult {
+  std::string version;
+  std::string etag;
+  std::string expiration;
+  std::string ssealgorithm;
+};
+
+struct PutObjectRequestParameters {
+  std::string bucket;
+  std::string object_key;
+  std::string storage_class;
+  std::string server_side_encryption;
+  std::string content_type;
+  std::map<std::string, std::string> user_metadata_map;
+  std::string fullcontrol_user_list;
+  std::string read_permission_user_list;
+  std::string read_acl_user_list;
+  std::string write_acl_user_list;
+  std::string canned_acl;
+};
+
+struct GetObjectRequestParameters {
+  std::string bucket;
+  std::string object_key;
+  std::string version;
+  bool requester_pays = false;
+};
+
+struct HeadObjectResult {
+  std::string path;
+  std::string absolute_path;
+  std::string filename;
+  std::string mime_type;
+  std::string etag;
+  Expiration expiration;
+  std::string ssealgorithm;
+  std::string version;
+  std::map<std::string, std::string> user_metadata_map;
+
+  void setFilePaths(const std::string& key);
+};
+
+struct GetObjectResult : public HeadObjectResult {
+  int64_t write_size = 0;
+};
+
+struct ListRequestParameters {
+  std::string bucket;
+  std::string delimiter;
+  std::string prefix;
+  bool use_versions = false;
+  uint64_t min_object_age = 0;
+};
+
+struct ListedObjectAttributes {
+  std::string filename;
+  std::string etag;
+  bool is_latest = false;
+  int64_t last_modified = 0;
+  int64_t length = 0;
+  std::string store_class;
+  std::string version;
+};
+
+using HeadObjectRequestParameters = GetObjectRequestParameters;
+
+class S3Wrapper {
+ public:
+  S3Wrapper();
+  explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
+
+  void setCredentials(const Aws::Auth::AWSCredentials& cred);
+  void setRegion(const Aws::String& region);
+  void setTimeout(uint64_t timeout);
+  void setEndpointOverrideUrl(const Aws::String& url);
+  void setProxy(const ProxyOptions& proxy);
+
+  minifi::utils::optional<PutObjectResult> putObject(const PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream> data_stream);
+  bool deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version = "");
+  minifi::utils::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& fetched_body);
+  minifi::utils::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
+  minifi::utils::optional<std::map<std::string, std::string>> getObjectTags(const std::string& bucket, const std::string& object_key, const std::string& version = "");
+  minifi::utils::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
+
+  virtual ~S3Wrapper() = default;
+
+ private:
+  static Expiration getExpiration(const std::string& expiration);
+
+  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
+  static int64_t writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output);
+  static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption);
+
+  minifi::utils::optional<std::vector<ListedObjectAttributes>> listVersions(const ListRequestParameters& params);
+  minifi::utils::optional<std::vector<ListedObjectAttributes>> listObjects(const ListRequestParameters& params);
+  void addListResults(const Aws::Vector<Aws::S3::Model::ObjectVersion>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
+  void addListResults(const Aws::Vector<Aws::S3::Model::Object>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
+
+  template<typename ListRequest>
+  ListRequest createListRequest(const ListRequestParameters& params);
+
+  template<typename FetchObjectRequest>
+  FetchObjectRequest createFetchObjectRequest(const GetObjectRequestParameters& get_object_params);
+
+  template<typename AwsResult, typename FetchObjectResult>
+  FetchObjectResult fillFetchObjectResult(const GetObjectRequestParameters& get_object_params, const AwsResult& fetch_object_result);
+
+  const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
+  std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3Wrapper>::getLogger()};
+  std::unique_ptr<S3RequestSender> request_sender_;
+  uint64_t last_bucket_list_timestamp_ = 0;
 };
 
 }  // namespace s3
diff --git a/extensions/aws/s3/S3WrapperBase.cpp b/extensions/aws/s3/S3WrapperBase.cpp
deleted file mode 100644
index 3f7c718..0000000
--- a/extensions/aws/s3/S3WrapperBase.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * @file S3Wrapper.cpp
- * S3Wrapper class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "S3WrapperBase.h"
-
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "utils/StringUtils.h"
-#include "utils/file/FileUtils.h"
-#include "utils/RegexUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace aws {
-namespace s3 {
-
-void GetObjectResult::setFilePaths(const std::string& key) {
-  absolute_path = key;
-  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/);
-}
-
-void S3WrapperBase::setCredentials(const Aws::Auth::AWSCredentials& cred) {
-  logger_->log_debug("Setting new AWS credentials");
-  credentials_ = cred;
-}
-
-void S3WrapperBase::setRegion(const Aws::String& region) {
-  logger_->log_debug("Setting new AWS region [%s]", region);
-  client_config_.region = region;
-}
-
-void S3WrapperBase::setTimeout(uint64_t timeout) {
-  logger_->log_debug("Setting AWS client connection timeout [%d]", timeout);
-  client_config_.connectTimeoutMs = timeout;
-}
-
-void S3WrapperBase::setEndpointOverrideUrl(const Aws::String& url) {
-  logger_->log_debug("Setting AWS endpoint url [%s]", url);
-  client_config_.endpointOverride = url;
-}
-
-void S3WrapperBase::setProxy(const ProxyOptions& proxy) {
-  logger_->log_debug("Setting AWS client proxy host [%s] port [%d]", proxy.host, proxy.port);
-  client_config_.proxyHost = proxy.host;
-  client_config_.proxyPort = proxy.port;
-  client_config_.proxyUserName = proxy.username;
-  client_config_.proxyPassword = proxy.password;
-}
-
-void S3WrapperBase::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const {
-  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end())
-    return;
-
-  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
-  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
-}
-
-Expiration S3WrapperBase::getExpiration(const std::string& expiration) {
-  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
-  const auto match = expr.match(expiration);
-  const auto& results = expr.getResult();
-  if (!match || results.size() < 3)
-    return Expiration{};
-  return Expiration{results[1], results[2]};
-}
-
-std::string S3WrapperBase::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) {
-  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
-    return "";
-  }
-
-  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(),
-    [&](const std::pair<std::string, const Aws::S3::Model::ServerSideEncryption&> pair) {
-      return pair.second == encryption;
-    });
-  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
-    return it->first;
-  }
-  return "";
-}
-
-minifi::utils::optional<PutObjectResult> S3WrapperBase::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
-  request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
-
-  auto aws_result = sendPutObjectRequest(request);
-  if (!aws_result) {
-    return minifi::utils::nullopt;
-  }
-
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result.value().GetETag(), '"');
-  result.version = aws_result.value().GetVersionId();
-
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration_time = getExpiration(aws_result.value().GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result.value().GetServerSideEncryption());
-  return result;
-}
-
-bool S3WrapperBase::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) {
-  Aws::S3::Model::DeleteObjectRequest request;
-  request.SetBucket(bucket);
-  request.SetKey(object_key);
-  if (!version.empty()) {
-    request.SetVersionId(version);
-  }
-  return sendDeleteObjectRequest(request);
-}
-
-int64_t S3WrapperBase::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr<io::BaseStream>& output) {
-  static const uint64_t BUFFER_SIZE = 4096;
-  std::vector<uint8_t> buffer;
-  buffer.reserve(BUFFER_SIZE);
-
-  int64_t write_size = 0;
-  while (write_size < data_size) {
-    auto next_write_size = data_size - write_size < BUFFER_SIZE ? data_size - write_size : BUFFER_SIZE;
-    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) {
-      return -1;
-    }
-    auto ret = output->write(buffer.data(), next_write_size);
-    if (ret < 0) {
-      return ret;
-    }
-    write_size += next_write_size;
-  }
-  return write_size;
-}
-
-minifi::utils::optional<GetObjectResult> S3WrapperBase::getObject(const GetObjectRequestParameters& get_object_params, const std::shared_ptr<io::BaseStream>& out_body) {
-  Aws::S3::Model::GetObjectRequest request;
-  request.SetBucket(get_object_params.bucket);
-  request.SetKey(get_object_params.object_key);
-  if (!get_object_params.version.empty()) {
-    request.SetVersionId(get_object_params.version);
-  }
-  if (get_object_params.requester_pays) {
-    request.SetRequestPayer(Aws::S3::Model::RequestPayer::requester);
-  }
-  auto aws_result = sendGetObjectRequest(request);
-  if (!aws_result) {
-    return minifi::utils::nullopt;
-  }
-
-  GetObjectResult result;
-  result.setFilePaths(get_object_params.object_key);
-  result.mime_type = aws_result->GetContentType();
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.expiration = getExpiration(aws_result.value().GetExpiration());
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
-  result.version = aws_result->GetVersionId();
-  result.write_size = writeFetchedBody(aws_result->GetBody(), aws_result->GetContentLength(), out_body);
-
-  return result;
-}
-
-}  // namespace s3
-}  // namespace aws
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/extensions/aws/s3/S3WrapperBase.h b/extensions/aws/s3/S3WrapperBase.h
deleted file mode 100644
index 7a91669..0000000
--- a/extensions/aws/s3/S3WrapperBase.h
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * @file AbstractS3Client.h
- * AbstractS3Client class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <map>
-#include <unordered_map>
-#include <memory>
-#include <sstream>
-#include <utility>
-
-#include "aws/core/auth/AWSCredentialsProvider.h"
-#include "aws/s3/S3Client.h"
-#include "aws/s3/model/PutObjectRequest.h"
-#include "aws/s3/model/PutObjectResult.h"
-#include "aws/s3/model/DeleteObjectRequest.h"
-#include "aws/s3/model/GetObjectRequest.h"
-#include "aws/s3/model/GetObjectResult.h"
-#include "aws/s3/model/StorageClass.h"
-#include "aws/s3/model/ServerSideEncryption.h"
-#include "aws/s3/model/ObjectCannedACL.h"
-
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "utils/AWSInitializer.h"
-#include "utils/OptionalUtils.h"
-#include "io/BaseStream.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace aws {
-namespace s3 {
-
-static const std::unordered_map<std::string, Aws::S3::Model::StorageClass> STORAGE_CLASS_MAP {
-  {"Standard", Aws::S3::Model::StorageClass::STANDARD},
-  {"ReducedRedundancy", Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY},
-  {"StandardIA", Aws::S3::Model::StorageClass::STANDARD_IA},
-  {"OnezoneIA", Aws::S3::Model::StorageClass::ONEZONE_IA},
-  {"IntelligentTiering", Aws::S3::Model::StorageClass::INTELLIGENT_TIERING},
-  {"Glacier", Aws::S3::Model::StorageClass::GLACIER},
-  {"DeepArchive", Aws::S3::Model::StorageClass::DEEP_ARCHIVE}
-};
-
-static const std::unordered_map<std::string, Aws::S3::Model::ServerSideEncryption> SERVER_SIDE_ENCRYPTION_MAP {
-  {"None", Aws::S3::Model::ServerSideEncryption::NOT_SET},
-  {"AES256", Aws::S3::Model::ServerSideEncryption::AES256},
-  {"aws_kms", Aws::S3::Model::ServerSideEncryption::aws_kms},
-};
-
-static const std::unordered_map<std::string, Aws::S3::Model::ObjectCannedACL> CANNED_ACL_MAP {
-  {"BucketOwnerFullControl", Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control},
-  {"BucketOwnerRead", Aws::S3::Model::ObjectCannedACL::bucket_owner_read},
-  {"AuthenticatedRead", Aws::S3::Model::ObjectCannedACL::authenticated_read},
-  {"PublicReadWrite", Aws::S3::Model::ObjectCannedACL::public_read_write},
-  {"PublicRead", Aws::S3::Model::ObjectCannedACL::public_read},
-  {"Private", Aws::S3::Model::ObjectCannedACL::private_},
-  {"AwsExecRead", Aws::S3::Model::ObjectCannedACL::aws_exec_read},
-};
-
-struct Expiration {
-  std::string expiration_time;
-  std::string expiration_time_rule_id;
-};
-
-struct PutObjectResult {
-  std::string version;
-  std::string etag;
-  std::string expiration_time;
-  std::string ssealgorithm;
-};
-
-struct PutObjectRequestParameters {
-  std::string bucket;
-  std::string object_key;
-  std::string storage_class;
-  std::string server_side_encryption;
-  std::string content_type;
-  std::map<std::string, std::string> user_metadata_map;
-  std::string fullcontrol_user_list;
-  std::string read_permission_user_list;
-  std::string read_acl_user_list;
-  std::string write_acl_user_list;
-  std::string canned_acl;
-};
-
-struct GetObjectRequestParameters {
-  std::string bucket;
-  std::string object_key;
-  std::string version;
-  bool requester_pays = false;
-};
-
-struct GetObjectResult {
- public:
-  std::string path;
-  std::string absolute_path;
-  std::string filename;
-  std::string mime_type;
-  std::string etag;
-  Expiration expiration;
-  std::string ssealgorithm;
-  std::string version;
-  int64_t write_size = 0;
-
-  void setFilePaths(const std::string& key);
-};
-
-struct ProxyOptions {
-  std::string host;
-  uint32_t port = 0;
-  std::string username;
-  std::string password;
-};
-
-class S3WrapperBase {
- public:
-  void setCredentials(const Aws::Auth::AWSCredentials& cred);
-  void setRegion(const Aws::String& region);
-  void setTimeout(uint64_t timeout);
-  void setEndpointOverrideUrl(const Aws::String& url);
-  void setProxy(const ProxyOptions& proxy);
-
-  minifi::utils::optional<PutObjectResult> putObject(const PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream> data_stream);
-  bool deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version = "");
-  minifi::utils::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, const std::shared_ptr<io::BaseStream>& fetched_body);
-
-  virtual ~S3WrapperBase() = default;
-
- protected:
-  virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) = 0;
-  virtual bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) = 0;
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
-  int64_t writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr<io::BaseStream>& output);
-  static Expiration getExpiration(const std::string& expiration);
-  static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption);
-
-  const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
-  Aws::Client::ClientConfiguration client_config_;
-  Aws::Auth::AWSCredentials credentials_;
-  std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3WrapperBase>::getLogger()};
-};
-
-} /* namespace s3 */
-} /* namespace aws */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
index 094dec0..aa73a66 100644
--- a/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/DeleteS3ObjectTests.cpp
@@ -22,7 +22,7 @@
 
 namespace {
 
-using DeleteS3ObjectTestsFixture = S3TestsFixture<minifi::aws::processors::DeleteS3Object>;
+using DeleteS3ObjectTestsFixture = FlowProcessorS3TestsFixture<minifi::aws::processors::DeleteS3Object>;
 using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
 
 TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test AWS credential setting", "[awsCredentials]") {
@@ -56,8 +56,8 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test AWS credential setting", "[aw
   }
 
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSAccessKeyId() == "key");
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
 }
 
 TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") {
@@ -91,9 +91,9 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]
 TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test success case with default values", "[awsS3DeleteSuccess]") {
   setRequiredProperties();
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() == "testBucket");
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() == INPUT_FILENAME);
-  REQUIRE(!mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetBucket() == "testBucket");
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetKey() == INPUT_FILENAME);
+  REQUIRE(!mock_s3_request_sender_ptr->delete_object_request.VersionIdHasBeenSet());
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Successfully deleted S3 object"));
 }
 
@@ -102,8 +102,8 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test version setting", "[awsS3Dele
   plan->setProperty(update_attribute, "s3.version", "v1", true);
   plan->setProperty(s3_processor, "Version", "${s3.version}");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.VersionIdHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetVersionId() == "v1");
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.VersionIdHasBeenSet());
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Successfully deleted S3 object"));
 }
 
@@ -114,9 +114,9 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test optional client configuration
   plan->setProperty(update_attribute, "test.endpoint", "http://localhost:1234", true);
   plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
 }
 
 TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test failure case", "[awsS3DeleteFailure]") {
@@ -128,11 +128,11 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test failure case", "[awsS3DeleteF
   setRequiredProperties();
   plan->setProperty(s3_processor, "Version", "v1");
   log_failure->setAutoTerminatedRelationships({{core::Relationship("success", "d")}});
-  mock_s3_wrapper_ptr->setDeleteObjectResult(false);
+  mock_s3_request_sender_ptr->setDeleteObjectResult(false);
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetBucket() == "testBucket");
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetKey() == INPUT_FILENAME);
-  REQUIRE(mock_s3_wrapper_ptr->delete_object_request.GetVersionId() == "v1");
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetBucket() == "testBucket");
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetKey() == INPUT_FILENAME);
+  REQUIRE(mock_s3_request_sender_ptr->delete_object_request.GetVersionId() == "v1");
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Failed to delete S3 object"));
 }
 
diff --git a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
index 3771ea7..dadb85d 100644
--- a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
@@ -29,7 +29,7 @@ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
 using org::apache::nifi::minifi::utils::file::get_file_content;
 using org::apache::nifi::minifi::utils::file::get_separator;
 
-class FetchS3ObjectTestsFixture : public S3TestsFixture<minifi::aws::processors::FetchS3Object> {
+class FetchS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::FetchS3Object> {
  public:
   FetchS3ObjectTestsFixture() {
     auto putfile = plan->addProcessor(
@@ -75,8 +75,8 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test AWS credential setting", "[aws
   }
 
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSAccessKeyId() == "key");
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") {
@@ -119,16 +119,16 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default properties", "[awsS3Co
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTime value:" + S3_EXPIRATION_DATE));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTimeRuleId value:" + S3_EXPIRATION_TIME_RULE_ID));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
-  REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION));
+  REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
   REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME) == S3_CONTENT);
-  REQUIRE(mock_s3_wrapper_ptr->get_object_request.GetVersionId().empty());
-  REQUIRE(!mock_s3_wrapper_ptr->get_object_request.VersionIdHasBeenSet());
-  REQUIRE(mock_s3_wrapper_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::NOT_SET);
+  REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId().empty());
+  REQUIRE(!mock_s3_request_sender_ptr->get_object_request.VersionIdHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::NOT_SET);
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test empty optional S3 results", "[awsS3Config]") {
   setRequiredProperties();
-  mock_s3_wrapper_ptr->returnEmptyS3Result();
+  mock_s3_request_sender_ptr->returnEmptyS3Result();
   test_controller.runSession(plan, true);
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:" + S3_BUCKET));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:" + INPUT_FILENAME));
@@ -155,11 +155,11 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test optional values are set in request", "[awsS3Config]") {
   setRequiredProperties();
-  plan->setProperty(s3_processor, "Version", S3_VERSION);
+  plan->setProperty(s3_processor, "Version", S3_VERSION_1);
   plan->setProperty(s3_processor, "Requester Pays", "true");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->get_object_request.GetVersionId() == S3_VERSION);
-  REQUIRE(mock_s3_wrapper_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
+  REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId() == S3_VERSION_1);
+  REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test non-default client configuration values", "[awsS3Config]") {
@@ -169,9 +169,9 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test non-default client configurati
   plan->setProperty(update_attribute, "test.endpoint", "http://localhost:1234", true);
   plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
 }
 
 }  // namespace
diff --git a/libminifi/test/aws-tests/ListS3Tests.cpp b/libminifi/test/aws-tests/ListS3Tests.cpp
new file mode 100644
index 0000000..08e7f3c
--- /dev/null
+++ b/libminifi/test/aws-tests/ListS3Tests.cpp
@@ -0,0 +1,231 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "S3TestsFixture.h"
+#include "processors/ListS3.h"
+
+using ListS3TestsFixture = FlowProducerS3TestsFixture<minifi::aws::processors::ListS3>;
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test AWS credential setting", "[awsCredentials]") {
+  setBucket();
+
+  SECTION("Test property credentials") {
+    setAccesKeyCredentialsInProcessor();
+  }
+
+  SECTION("Test credentials setting from AWS Credentials service") {
+    setAccessKeyCredentialsInController();
+    setCredentialsService();
+  }
+
+  SECTION("Test credentials file setting") {
+    setCredentialFile(s3_processor);
+  }
+
+  SECTION("Test credentials file setting from AWS Credentials service") {
+    setCredentialFile(aws_credentials_service);
+    setCredentialsService();
+  }
+
+  SECTION("Test credentials setting using default credential chain") {
+    setUseDefaultCredentialsChain(s3_processor);
+  }
+
+  SECTION("Test credentials setting from AWS Credentials service using default credential chain") {
+    setUseDefaultCredentialsChain(aws_credentials_service);
+    setCredentialsService();
+  }
+
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test required property not set", "[awsS3Errors]") {
+  SECTION("Test credentials not set") {
+  }
+
+  SECTION("Test no bucket is set") {
+    setAccesKeyCredentialsInProcessor();
+  }
+
+  SECTION("Test region is empty") {
+    setRequiredProperties();
+    plan->setProperty(s3_processor, "Region", "");
+  }
+
+  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test proxy setting", "[awsS3Proxy]") {
+  setRequiredProperties();
+  setProxy();
+  test_controller.runSession(plan, true);
+  checkProxySettings();
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without versioning", "[awsS3ListObjects]") {
+  setRequiredProperties();
+  test_controller.runSession(plan, true);
+
+  for (auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)));
+    REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)));
+  }
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT / 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") == 0);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
+  REQUIRE(!mock_s3_request_sender_ptr->list_object_request.ContinuationTokenHasBeenSet());
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test listing with versioning", "[awsS3ListVersions]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Use Versions", "true");
+  test_controller.runSession(plan, true);
+
+  for (auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+    // 2 versions of every object
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)) == 2);
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)) == 2);
+  }
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:false") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata") == 0);
+  REQUIRE(!mock_s3_request_sender_ptr->list_version_request.KeyMarkerHasBeenSet());
+  REQUIRE(!mock_s3_request_sender_ptr->list_version_request.VersionIdMarkerHasBeenSet());
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test if optional request values are set without versioning", "[awsS3ListOptionalValues]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Delimiter", "/");
+  plan->setProperty(s3_processor, "Prefix", "test/");
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetDelimiter() == "/");
+  REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetPrefix() == "test/");
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test if optional request values are set with versioning", "[awsS3ListOptionalValues]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Delimiter", "/");
+  plan->setProperty(s3_processor, "Prefix", "test/");
+  plan->setProperty(s3_processor, "Use Versions", "true");
+  test_controller.runSession(plan, true);
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetDelimiter() == "/");
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetPrefix() == "test/");
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test minimum age property handling with non-versioned objects", "[awsS3ListMinAge]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Minimum Object Age", "120 days");
+  test_controller.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT / 2);
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test minimum age property handling with versioned objects", "[awsS3ListMinAge]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Minimum Object Age", "120 days");
+  plan->setProperty(s3_processor, "Use Versions", "true");
+  test_controller.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == 0);
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test write object tags", "[awsS3ListTags]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Write Object Tags", "true");
+  test_controller.runSession(plan, true);
+  for (const auto& tag : S3_OBJECT_TAGS) {
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag." + tag.first + " value:" + tag.second) == S3_OBJECT_COUNT);
+  }
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test write user metadata", "[awsS3ListMetadata]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Write User Metadata", "true");
+  plan->setProperty(s3_processor, "Requester Pays", "true");
+  test_controller.runSession(plan, true);
+  for (const auto& metadata : S3_OBJECT_USER_METADATA) {
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata." + metadata.first + " value:" + metadata.second) == S3_OBJECT_COUNT);
+  }
+  REQUIRE(mock_s3_request_sender_ptr->head_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated listing without versioning", "[awsS3ListObjects]") {
+  setRequiredProperties();
+  mock_s3_request_sender_ptr->setListingTruncated(true);
+  test_controller.runSession(plan, true);
+  for (auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)));
+    REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)));
+  }
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT / 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") == 0);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
+  REQUIRE(mock_s3_request_sender_ptr->list_object_request.ContinuationTokenHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetContinuationToken() == S3_CONTINUATION_TOKEN);
+}
+
+TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated listing with versioning", "[awsS3ListVersions]") {
+  setRequiredProperties();
+  plan->setProperty(s3_processor, "Use Versions", "true");
+  mock_s3_request_sender_ptr->setListingTruncated(true);
+  test_controller.runSession(plan, true);
+  for (auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+    // 2 versions of every object
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)) == 2);
+    REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)) == 2);
+  }
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:false") == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
+  REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata") == 0);
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.KeyMarkerHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetKeyMarker() == S3_KEY_MARKER);
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.VersionIdMarkerHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetVersionIdMarker() == S3_VERSION_ID_MARKER);
+}
diff --git a/libminifi/test/aws-tests/MockS3RequestSender.h b/libminifi/test/aws-tests/MockS3RequestSender.h
new file mode 100644
index 0000000..418cf8b
--- /dev/null
+++ b/libminifi/test/aws-tests/MockS3RequestSender.h
@@ -0,0 +1,255 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <sstream>
+#include <vector>
+#include <map>
+
+#include "s3/S3RequestSender.h"
+#include "aws/core/utils/DateTime.h"
+
+const std::string S3_VERSION_1 = "1.2.3";
+const std::string S3_VERSION_2 = "1.2.4";
+const std::string S3_ETAG = "\"tag-123\"";
+const std::string S3_ETAG_UNQUOTED = "tag-123";
+const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00 GMT\", rule-id=\"my_expiration_rule\"";
+const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
+const std::string S3_EXPIRATION_TIME_RULE_ID = "my_expiration_rule";
+const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM = Aws::S3::Model::ServerSideEncryption::aws_kms;
+const std::string S3_SSEALGORITHM_STR = "aws_kms";
+const std::string S3_CONTENT_TYPE = "application/octet-stream";
+const std::string S3_CONTENT = "INPUT_DATA";
+const std::string S3_KEY_PREFIX = "KEY_";
+const std::string S3_ETAG_PREFIX = "ETAG_";
+const std::size_t S3_OBJECT_COUNT = 10;
+const int64_t S3_OBJECT_SIZE = 1024;
+const int64_t S3_OBJECT_OLD_AGE_MILLISECONDS = 652924800;
+const std::string S3_STORAGE_CLASS_STR = "Standard";
+const std::map<std::string, std::string> S3_OBJECT_TAGS {
+  std::make_pair("tag1", "value1"),
+  std::make_pair("tag2", "value2")
+};
+const std::map<std::string, std::string> S3_OBJECT_USER_METADATA {
+  std::make_pair("metadata_key_1", "metadata_value_1"),
+  std::make_pair("metadata_key_2", "metadata_value_2")
+};
+const std::string S3_KEY_MARKER = "continue_key";
+const std::string S3_VERSION_ID_MARKER = "continue_version";
+const std::string S3_CONTINUATION_TOKEN = "continue";
+
+class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
+ public:
+  MockS3RequestSender() {
+    for(auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+      Aws::S3::Model::ObjectVersion version;
+      version.SetKey(S3_KEY_PREFIX + std::to_string(i));
+      version.SetETag(S3_ETAG_PREFIX + std::to_string(i));
+      version.SetIsLatest(false);
+      version.SetStorageClass(Aws::S3::Model::ObjectVersionStorageClass::STANDARD);
+      version.SetVersionId(S3_VERSION_1);
+      version.SetSize(S3_OBJECT_SIZE);
+      version.SetLastModified(Aws::Utils::DateTime(S3_OBJECT_OLD_AGE_MILLISECONDS));
+      listed_versions_.push_back(version);
+      version.SetVersionId(S3_VERSION_2);
+      version.SetIsLatest(true);
+      version.SetLastModified(Aws::Utils::DateTime::CurrentTimeMillis());
+      listed_versions_.push_back(version);
+    }
+
+    for(auto i = 0; i < S3_OBJECT_COUNT; ++i) {
+      Aws::S3::Model::Object object;
+      object.SetKey(S3_KEY_PREFIX + std::to_string(i));
+      object.SetETag(S3_ETAG_PREFIX + std::to_string(i));
+      object.SetStorageClass(Aws::S3::Model::ObjectStorageClass::STANDARD);
+      object.SetSize(S3_OBJECT_SIZE);
+      if (i % 2 == 0) {
+        object.SetLastModified(Aws::Utils::DateTime(S3_OBJECT_OLD_AGE_MILLISECONDS));
+      } else {
+        object.SetLastModified(Aws::Utils::DateTime::CurrentTimeMillis());
+      }
+      listed_objects_.push_back(object);
+    }
+  }
+
+  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
+    put_object_request = request;
+
+    Aws::S3::Model::PutObjectResult put_s3_result;
+    if (!return_empty_result_) {
+      put_s3_result.SetVersionId(S3_VERSION_1);
+      put_s3_result.SetETag(S3_ETAG);
+      put_s3_result.SetExpiration(S3_EXPIRATION);
+      put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
+    }
+    return put_s3_result;
+  }
+
+  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override {
+    delete_object_request = request;
+    return delete_object_result_;
+  }
+
+  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override {
+    get_object_request = request;
+
+    Aws::S3::Model::GetObjectResult get_s3_result;
+    if (!return_empty_result_) {
+      get_s3_result.SetVersionId(S3_VERSION_1);
+      get_s3_result.SetETag(S3_ETAG);
+      get_s3_result.SetExpiration(S3_EXPIRATION);
+      get_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
+      get_s3_result.SetContentType(S3_CONTENT_TYPE);
+      get_s3_result.ReplaceBody(new std::stringstream(S3_CONTENT));
+      get_s3_result.SetContentLength(S3_CONTENT.size());
+      get_s3_result.SetMetadata(S3_OBJECT_USER_METADATA);
+    }
+    return minifi::utils::make_optional(std::move(get_s3_result));
+  }
+
+  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) override {
+    list_object_request = request;
+
+    Aws::S3::Model::ListObjectsV2Result list_object_result;
+    if (!is_listing_truncated_) {
+      for (auto i = 0; i < listed_objects_.size(); ++i) {
+        list_object_result.AddContents(listed_objects_[i]);
+      }
+      return list_object_result;
+    }
+
+    if (request.GetContinuationToken().empty()) {
+      list_object_result.SetNextContinuationToken(S3_CONTINUATION_TOKEN);
+      list_object_result.SetIsTruncated(true);
+      for (auto i = 0; i < listed_objects_.size() / 2; ++i) {
+        list_object_result.AddContents(listed_objects_[i]);
+      }
+    } else {
+      list_object_result.SetIsTruncated(false);
+      for (auto i = listed_objects_.size() / 2; i < listed_objects_.size(); ++i) {
+        list_object_result.AddContents(listed_objects_[i]);
+      }
+    }
+    return list_object_result;
+  }
+
+  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) override {
+    list_version_request = request;
+
+    Aws::S3::Model::ListObjectVersionsResult list_version_result;
+    if (!is_listing_truncated_) {
+      for (auto i = 0; i < listed_versions_.size(); ++i) {
+        list_version_result.AddVersions(listed_versions_[i]);
+      }
+      return list_version_result;
+    }
+
+    if (request.GetKeyMarker().empty() && request.GetVersionIdMarker().empty()) {
+      list_version_result.SetNextKeyMarker(S3_KEY_MARKER);
+      list_version_result.SetNextVersionIdMarker(S3_VERSION_ID_MARKER);
+      list_version_result.SetIsTruncated(true);
+      for (auto i = 0; i < listed_versions_.size() / 2; ++i) {
+        list_version_result.AddVersions(listed_versions_[i]);
+      }
+    } else {
+      list_version_result.SetIsTruncated(false);
+      for (auto i = listed_versions_.size() / 2; i < listed_versions_.size(); ++i) {
+        list_version_result.AddVersions(listed_versions_[i]);
+      }
+    }
+    return list_version_result;
+  }
+
+  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) override {
+    get_object_tagging_request = request;
+    Aws::S3::Model::GetObjectTaggingResult result;
+    for (const auto& tag_pair : S3_OBJECT_TAGS) {
+      Aws::S3::Model::Tag tag;
+      tag.SetKey(tag_pair.first);
+      tag.SetValue(tag_pair.second);
+      result.AddTagSet(tag);
+    }
+    return result;
+  }
+
+  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) override {
+    head_object_request = request;
+
+    Aws::S3::Model::HeadObjectResult head_s3_result;
+    if (!return_empty_result_) {
+      head_s3_result.SetVersionId(S3_VERSION_1);
+      head_s3_result.SetETag(S3_ETAG);
+      head_s3_result.SetExpiration(S3_EXPIRATION);
+      head_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
+      head_s3_result.SetContentType(S3_CONTENT_TYPE);
+      head_s3_result.SetContentLength(S3_CONTENT.size());
+      head_s3_result.SetMetadata(S3_OBJECT_USER_METADATA);
+    }
+    return minifi::utils::make_optional(std::move(head_s3_result));
+  }
+
+  Aws::Auth::AWSCredentials getCredentials() const {
+    return credentials_;
+  }
+
+  Aws::Client::ClientConfiguration getClientConfig() const {
+    return client_config_;
+  }
+
+  std::string getPutObjectRequestBody() const {
+    std::istreambuf_iterator<char> buf_it;
+    return std::string(std::istreambuf_iterator<char>(*put_object_request.GetBody()), buf_it);
+  }
+
+  void returnEmptyS3Result(bool return_empty_result = true) {
+    return_empty_result_ = return_empty_result;
+  }
+
+  void setDeleteObjectResult(bool delete_object_result) {
+    delete_object_result_ = delete_object_result;
+  }
+
+  std::vector<Aws::S3::Model::ObjectVersion> getListedVersion() const {
+    return listed_versions_;
+  }
+
+  std::vector<Aws::S3::Model::Object> getListedObjects() const {
+    return listed_objects_;
+  }
+
+  void setListingTruncated(bool is_listing_truncated) {
+    is_listing_truncated_ = is_listing_truncated;
+  }
+
+  Aws::S3::Model::PutObjectRequest put_object_request;
+  Aws::S3::Model::DeleteObjectRequest delete_object_request;
+  Aws::S3::Model::GetObjectRequest get_object_request;
+  Aws::S3::Model::ListObjectsV2Request list_object_request;
+  Aws::S3::Model::ListObjectVersionsRequest list_version_request;
+  Aws::S3::Model::GetObjectTaggingRequest get_object_tagging_request;
+  Aws::S3::Model::HeadObjectRequest head_object_request;
+
+ private:
+  std::vector<Aws::S3::Model::ObjectVersion> listed_versions_;
+  std::vector<Aws::S3::Model::Object> listed_objects_;
+  bool delete_object_result_ = true;
+  bool return_empty_result_ = false;
+  bool is_listing_truncated_ = false;
+};
diff --git a/libminifi/test/aws-tests/MockS3Wrapper.h b/libminifi/test/aws-tests/MockS3Wrapper.h
deleted file mode 100644
index 2e25b68..0000000
--- a/libminifi/test/aws-tests/MockS3Wrapper.h
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <sstream>
-
-#include "s3/S3WrapperBase.h"
-
-const std::string S3_VERSION = "1.2.3";
-const std::string S3_ETAG = "\"tag-123\"";
-const std::string S3_ETAG_UNQUOTED = "tag-123";
-const std::string S3_EXPIRATION = "expiry-date=\"Wed, 28 Oct 2020 00:00:00 GMT\", rule-id=\"my_expiration_rule\"";
-const std::string S3_EXPIRATION_DATE = "Wed, 28 Oct 2020 00:00:00 GMT";
-const std::string S3_EXPIRATION_TIME_RULE_ID = "my_expiration_rule";
-const Aws::S3::Model::ServerSideEncryption S3_SSEALGORITHM = Aws::S3::Model::ServerSideEncryption::aws_kms;
-const std::string S3_SSEALGORITHM_STR = "aws_kms";
-const std::string S3_CONTENT_TYPE = "application/octet-stream";
-const std::string S3_CONTENT = "INPUT_DATA";
-
-class MockS3Wrapper : public minifi::aws::s3::S3WrapperBase {
- public:
-  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
-    put_object_request = request;
-
-    Aws::S3::Model::PutObjectResult put_s3_result;
-    if (!return_empty_result_) {
-      put_s3_result.SetVersionId(S3_VERSION);
-      put_s3_result.SetETag(S3_ETAG);
-      put_s3_result.SetExpiration(S3_EXPIRATION);
-      put_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
-    }
-    return put_s3_result;
-  }
-
-  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override {
-    delete_object_request = request;
-    return delete_object_result_;
-  }
-
-  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override {
-    get_object_request = request;
-
-    Aws::S3::Model::GetObjectResult get_s3_result;
-    if (!return_empty_result_) {
-      get_s3_result.SetVersionId(S3_VERSION);
-      get_s3_result.SetETag(S3_ETAG);
-      get_s3_result.SetExpiration(S3_EXPIRATION);
-      get_s3_result.SetServerSideEncryption(S3_SSEALGORITHM);
-      get_s3_result.SetContentType(S3_CONTENT_TYPE);
-      get_s3_result.ReplaceBody(new std::stringstream(S3_CONTENT));
-      get_s3_result.SetContentLength(S3_CONTENT.size());
-    }
-    return minifi::utils::make_optional(std::move(get_s3_result));
-  }
-
-  Aws::Auth::AWSCredentials getCredentials() const {
-    return credentials_;
-  }
-
-  Aws::Client::ClientConfiguration getClientConfig() const {
-    return client_config_;
-  }
-
-  std::string getPutObjectRequestBody() const {
-    std::istreambuf_iterator<char> buf_it;
-    return std::string(std::istreambuf_iterator<char>(*put_object_request.GetBody()), buf_it);
-  }
-
-  void returnEmptyS3Result(bool return_empty_result = true) {
-    return_empty_result_ = return_empty_result;
-  }
-
-  void setDeleteObjectResult(bool delete_object_result) {
-    delete_object_result_ = delete_object_result;
-  }
-
-  Aws::S3::Model::PutObjectRequest put_object_request;
-  Aws::S3::Model::DeleteObjectRequest delete_object_request;
-  Aws::S3::Model::GetObjectRequest get_object_request;
-
- private:
-  bool delete_object_result_ = true;
-  bool return_empty_result_ = false;
-};
diff --git a/libminifi/test/aws-tests/PutS3ObjectTests.cpp b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
index b72ce33..547c4ca 100644
--- a/libminifi/test/aws-tests/PutS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/PutS3ObjectTests.cpp
@@ -24,10 +24,10 @@ namespace {
 
 using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
 
-class PutS3ObjectTestsFixture : public S3TestsFixture<minifi::aws::processors::PutS3Object> {
+class PutS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::PutS3Object> {
  public:
   void checkPutObjectResults() {
-    REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION));
+    REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
     REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.etag value:" + S3_ETAG_UNQUOTED));
     REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expiration value:" + S3_EXPIRATION_DATE));
     REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
@@ -72,8 +72,8 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", "[awsCr
   }
 
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSAccessKeyId() == "key");
-  REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") {
@@ -114,22 +114,22 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration",
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:" + INPUT_FILENAME));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/octet-stream"));
   checkPutObjectResults();
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() == "application/octet-stream");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::STANDARD);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::NOT_SET);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::NOT_SET);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == minifi::aws::processors::region::US_WEST_2);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 30000);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride.empty());
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost.empty());
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName.empty());
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword.empty());
-  REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetContentType() == "application/octet-stream");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::STANDARD);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::NOT_SET);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::NOT_SET);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_WEST_2);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 30000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride.empty());
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyHost.empty());
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyUserName.empty());
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyPassword.empty());
+  REQUIRE(mock_s3_request_sender_ptr->getPutObjectRequestBody() == INPUT_DATA);
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration with empty result", "[awsS3ClientConfig]") {
   setRequiredProperties();
-  mock_s3_wrapper_ptr->returnEmptyS3Result();
+  mock_s3_request_sender_ptr->returnEmptyS3Result();
   test_controller.runSession(plan, true);
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:input_data.log"));
@@ -153,20 +153,20 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Set non-default client configuration"
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:custom_key"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/tar"));
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetContentType() == "application/tar");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::AES256);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().connectTimeoutMs == 10000);
-  REQUIRE(mock_s3_wrapper_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
-  REQUIRE(mock_s3_wrapper_ptr->getPutObjectRequestBody() == INPUT_DATA);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetContentType() == "application/tar");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::AES256);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
+  REQUIRE(mock_s3_request_sender_ptr->getPutObjectRequestBody() == INPUT_DATA);
 }
 
 TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test single user metadata", "[awsS3MetaData]") {
   setRequiredProperties();
   plan->setProperty(s3_processor, "meta_key", "meta_value", true);
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key") == "meta_value");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key") == "meta_value");
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.usermetadata value:meta_key=meta_value"));
 }
 
@@ -175,8 +175,8 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multiple user metadata", "[awsS3
   plan->setProperty(s3_processor, "meta_key1", "meta_value1", true);
   plan->setProperty(s3_processor, "meta_key2", "meta_value2", true);
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key1") == "meta_value1");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetMetadata().at("meta_key2") == "meta_value2");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key1") == "meta_value1");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key2") == "meta_value2");
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.usermetadata value:meta_key1=meta_value1,meta_key2=meta_value2"));
 }
 
@@ -200,11 +200,11 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test access control setting", "[awsS3
   plan->setProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite", true);
   plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
   test_controller.runSession(plan, true);
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantFullControl() == "id=myuserid123, emailAddress=\"myuser@example.com\"");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantRead() == "id=myuserid456, emailAddress=\"myuser2@example.com\"");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantReadACP() == "id=myuserid789, id=otheruser");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetGrantWriteACP() == "emailAddress=\"myuser3@example.com\"");
-  REQUIRE(mock_s3_wrapper_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::public_read_write);
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetGrantFullControl() == "id=myuserid123, emailAddress=\"myuser@example.com\"");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetGrantRead() == "id=myuserid456, emailAddress=\"myuser2@example.com\"");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetGrantReadACP() == "id=myuserid789, id=otheruser");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetGrantWriteACP() == "emailAddress=\"myuser3@example.com\"");
+  REQUIRE(mock_s3_request_sender_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::public_read_write);
 }
 
 }  // namespace
diff --git a/libminifi/test/aws-tests/S3TestsFixture.h b/libminifi/test/aws-tests/S3TestsFixture.h
index af2d248..262e0e5 100644
--- a/libminifi/test/aws-tests/S3TestsFixture.h
+++ b/libminifi/test/aws-tests/S3TestsFixture.h
@@ -27,7 +27,7 @@
 #include "processors/LogAttribute.h"
 #include "processors/UpdateAttribute.h"
 #include "utils/file/FileUtils.h"
-#include "MockS3Wrapper.h"
+#include "MockS3RequestSender.h"
 #include "utils/TestUtils.h"
 
 using org::apache::nifi::minifi::utils::createTempDir;
@@ -50,49 +50,17 @@ class S3TestsFixture {
     LogTestController::getInstance().setDebug<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::core::Processor>();
     LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
-    LogTestController::getInstance().setTrace<processors::GetFile>();
-    LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
     LogTestController::getInstance().setDebug<processors::LogAttribute>();
     LogTestController::getInstance().setTrace<T>();
 
     // Build MiNiFi processing graph
     plan = test_controller.createPlan();
-    mock_s3_wrapper_ptr = new MockS3Wrapper();
-    std::unique_ptr<minifi::aws::s3::S3WrapperBase> mock_s3_wrapper(mock_s3_wrapper_ptr);
-    s3_processor = std::shared_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_wrapper)));
-
-    auto input_dir = createTempDir(&test_controller);
-    std::ofstream input_file_stream(input_dir + utils::file::FileUtils::get_separator() + INPUT_FILENAME);
-    input_file_stream << INPUT_DATA;
-    input_file_stream.close();
-    get_file = plan->addProcessor("GetFile", "GetFile");
-    plan->setProperty(get_file, processors::GetFile::Directory.getName(), input_dir);
-    plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(), "false");
-    update_attribute = plan->addProcessor(
-      "UpdateAttribute",
-      "UpdateAttribute",
-      core::Relationship("success", "d"),
-      true);
-    plan->addProcessor(
-      s3_processor,
-      "S3Processor",
-      core::Relationship("success", "d"),
-      true);
-    plan->addProcessor(
-      "LogAttribute",
-      "LogAttribute",
-      core::Relationship("success", "d"),
-      true);
+    mock_s3_request_sender_ptr = new MockS3RequestSender();
+    std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(mock_s3_request_sender_ptr);
+    s3_processor = std::shared_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
     aws_credentials_service = plan->addController("AWSCredentialsService", "AWSCredentialsService");
   }
 
-  void setAccesKeyCredentialsInProcessor() {
-    plan->setProperty(update_attribute, "s3.accessKey", "key", true);
-    plan->setProperty(s3_processor, "Access Key", "${s3.accessKey}");
-    plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
-    plan->setProperty(s3_processor, "Secret Key", "${s3.secretKey}");
-  }
-
   void setAccessKeyCredentialsInController() {
     plan->setProperty(aws_credentials_service, "Access Key", "key");
     plan->setProperty(aws_credentials_service, "Secret Key", "secret");
@@ -100,8 +68,7 @@ class S3TestsFixture {
 
   template<typename Component>
   void setCredentialFile(const Component &component) {
-    char in_dir[] = "/tmp/gt.XXXXXX";
-    auto temp_path = test_controller.createTempDirectory(in_dir);
+    auto temp_path = createTempDir(&test_controller);
     REQUIRE(!temp_path.empty());
     std::string aws_credentials_file(temp_path + utils::file::FileUtils::get_separator() + "aws_creds.conf");
     std::ofstream aws_credentials_file_stream(aws_credentials_file);
@@ -127,32 +94,20 @@ class S3TestsFixture {
     plan->setProperty(s3_processor, "AWS Credentials Provider service", "AWSCredentialsService");
   }
 
-  void setBucket() {
-    plan->setProperty(update_attribute, "test.bucket", S3_BUCKET, true);
-    plan->setProperty(s3_processor, "Bucket", "${test.bucket}");
-  }
+  virtual void setAccesKeyCredentialsInProcessor() = 0;
+  virtual void setBucket() = 0;
+  virtual void setProxy() = 0;
 
   void setRequiredProperties() {
     setAccesKeyCredentialsInProcessor();
     setBucket();
   }
 
-  void setProxy() {
-    plan->setProperty(update_attribute, "test.proxyHost", "host", true);
-    plan->setProperty(s3_processor, "Proxy Host", "${test.proxyHost}");
-    plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
-    plan->setProperty(s3_processor, "Proxy Port", "${test.proxyPort}");
-    plan->setProperty(update_attribute, "test.proxyUsername", "username", true);
-    plan->setProperty(s3_processor, "Proxy Username", "${test.proxyUsername}");
-    plan->setProperty(update_attribute, "test.proxyPassword", "password", true);
-    plan->setProperty(s3_processor, "Proxy Password", "${test.proxyPassword}");
-  }
-
   void checkProxySettings() {
-    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyHost == "host");
-    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPort == 1234);
-    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyUserName == "username");
-    REQUIRE(mock_s3_wrapper_ptr->getClientConfig().proxyPassword == "password");
+    REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyHost == "host");
+    REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyPort == 1234);
+    REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyUserName == "username");
+    REQUIRE(mock_s3_request_sender_ptr->getClientConfig().proxyPassword == "password");
   }
 
   virtual ~S3TestsFixture() {
@@ -162,9 +117,102 @@ class S3TestsFixture {
  protected:
   TestController test_controller;
   std::shared_ptr<TestPlan> plan;
-  MockS3Wrapper* mock_s3_wrapper_ptr;
+  MockS3RequestSender* mock_s3_request_sender_ptr;
   std::shared_ptr<core::Processor> s3_processor;
-  std::shared_ptr<core::Processor> get_file;
   std::shared_ptr<core::Processor> update_attribute;
   std::shared_ptr<core::controller::ControllerServiceNode> aws_credentials_service;
 };
+
+template<typename T>
+class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
+ public:
+  const std::string INPUT_FILENAME = "input_data.log";
+  const std::string INPUT_DATA = "input_data";
+
+  FlowProcessorS3TestsFixture() {
+    LogTestController::getInstance().setTrace<processors::GetFile>();
+    LogTestController::getInstance().setDebug<processors::UpdateAttribute>();
+
+    auto input_dir = createTempDir(&this->test_controller);
+    std::ofstream input_file_stream(input_dir + utils::file::FileUtils::get_separator() + INPUT_FILENAME);
+    input_file_stream << INPUT_DATA;
+    input_file_stream.close();
+    auto get_file = this->plan->addProcessor("GetFile", "GetFile");
+    this->plan->setProperty(get_file, processors::GetFile::Directory.getName(), input_dir);
+    this->plan->setProperty(get_file, processors::GetFile::KeepSourceFile.getName(), "false");
+    update_attribute = this->plan->addProcessor(
+      "UpdateAttribute",
+      "UpdateAttribute",
+      core::Relationship("success", "d"),
+      true);
+    this->plan->addProcessor(
+      this->s3_processor,
+      "S3Processor",
+      core::Relationship("success", "d"),
+      true);
+    auto log_attribute = this->plan->addProcessor(
+      "LogAttribute",
+      "LogAttribute",
+      core::Relationship("success", "d"),
+      true);
+    this->plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  }
+
+  void setAccesKeyCredentialsInProcessor() override {
+    this->plan->setProperty(update_attribute, "s3.accessKey", "key", true);
+    this->plan->setProperty(this->s3_processor, "Access Key", "${s3.accessKey}");
+    this->plan->setProperty(update_attribute, "s3.secretKey", "secret", true);
+    this->plan->setProperty(this->s3_processor, "Secret Key", "${s3.secretKey}");
+  }
+
+  void setBucket() override {
+    this->plan->setProperty(update_attribute, "test.bucket", this->S3_BUCKET, true);
+    this->plan->setProperty(this->s3_processor, "Bucket", "${test.bucket}");
+  }
+
+  void setProxy() override {
+    this->plan->setProperty(update_attribute, "test.proxyHost", "host", true);
+    this->plan->setProperty(this->s3_processor, "Proxy Host", "${test.proxyHost}");
+    this->plan->setProperty(update_attribute, "test.proxyPort", "1234", true);
+    this->plan->setProperty(this->s3_processor, "Proxy Port", "${test.proxyPort}");
+    this->plan->setProperty(update_attribute, "test.proxyUsername", "username", true);
+    this->plan->setProperty(this->s3_processor, "Proxy Username", "${test.proxyUsername}");
+    this->plan->setProperty(update_attribute, "test.proxyPassword", "password", true);
+    this->plan->setProperty(this->s3_processor, "Proxy Password", "${test.proxyPassword}");
+  }
+
+ protected:
+  std::shared_ptr<core::Processor> update_attribute;
+};
+
+template<typename T>
+class FlowProducerS3TestsFixture : public S3TestsFixture<T> {
+ public:
+  FlowProducerS3TestsFixture() {
+    this->plan->addProcessor(
+      this->s3_processor,
+      "S3Processor");
+    auto log_attribute = this->plan->addProcessor(
+      "LogAttribute",
+      "LogAttribute",
+      core::Relationship("success", "d"),
+      true);
+    this->plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  }
+
+  void setAccesKeyCredentialsInProcessor() override {
+    this->plan->setProperty(this->s3_processor, "Access Key", "key");
+    this->plan->setProperty(this->s3_processor, "Secret Key", "secret");
+  }
+
+  void setBucket() override {
+    this->plan->setProperty(this->s3_processor, "Bucket", this->S3_BUCKET);
+  }
+
+  void setProxy() override {
+    this->plan->setProperty(this->s3_processor, "Proxy Host", "host");
+    this->plan->setProperty(this->s3_processor, "Proxy Port", "1234");
+    this->plan->setProperty(this->s3_processor, "Proxy Username", "username");
+    this->plan->setProperty(this->s3_processor, "Proxy Password", "password");
+  }
+};