You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/04/20 07:02:32 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1476 Improve parallel S3 request handling in S3 processors

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 79b071d  MINIFICPP-1476 Improve parallel S3 request handling in S3 processors
79b071d is described below

commit 79b071d2ae7f7becc9573b4a5347b3e18aac3cfc
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Fri Mar 12 12:13:31 2021 +0100

    MINIFICPP-1476 Improve parallel S3 request handling in S3 processors
    
    Remove instantiations of S3 client config objects.
    Instantiating a client config object initiates a curl call for EC2
    metadata retrieval. We need to minimize these calls to optimize
    performance. Creating a single client config per S3 processor minimizes
    the curl calls, and these configs are copied and mutated with the proper
    parameters for every S3 call.
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1031
---
 extensions/aws/processors/DeleteS3Object.cpp   | 44 ++++++++++--------
 extensions/aws/processors/DeleteS3Object.h     |  5 ++
 extensions/aws/processors/FetchS3Object.cpp    | 48 ++++++++++++--------
 extensions/aws/processors/FetchS3Object.h      |  5 ++
 extensions/aws/processors/ListS3.cpp           | 48 +++++++++++---------
 extensions/aws/processors/ListS3.h             |  3 +-
 extensions/aws/processors/PutS3Object.cpp      | 24 ++++------
 extensions/aws/processors/PutS3Object.h        |  8 ++--
 extensions/aws/processors/S3Processor.cpp      | 17 ++-----
 extensions/aws/processors/S3Processor.h        |  3 +-
 extensions/aws/s3/S3ClientRequestSender.cpp    | 49 ++++++++++++++------
 extensions/aws/s3/S3ClientRequestSender.h      | 35 +++++++++++---
 extensions/aws/s3/S3RequestSender.cpp          | 63 --------------------------
 extensions/aws/s3/S3RequestSender.h            | 43 ++++++++++++------
 extensions/aws/s3/S3Wrapper.cpp                | 54 +++++++---------------
 extensions/aws/s3/S3Wrapper.h                  | 47 ++++++++++++++-----
 libminifi/test/aws-tests/ListS3Tests.cpp       | 22 +++++++++
 libminifi/test/aws-tests/MockS3RequestSender.h | 51 ++++++++++++++++++---
 18 files changed, 318 insertions(+), 251 deletions(-)

diff --git a/extensions/aws/processors/DeleteS3Object.cpp b/extensions/aws/processors/DeleteS3Object.cpp
index fbe9338..c20d57e 100644
--- a/extensions/aws/processors/DeleteS3Object.cpp
+++ b/extensions/aws/processors/DeleteS3Object.cpp
@@ -51,6 +51,26 @@ void DeleteS3Object::initialize() {
   setSupportedRelationships({Failure, Success});
 }
 
+minifi::utils::optional<aws::s3::DeleteObjectRequestParameters> DeleteS3Object::buildDeleteS3RequestParams(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file,
+    const CommonProperties &common_properties) const {
+  aws::s3::DeleteObjectRequestParameters params(common_properties.credentials, client_config_);
+  context->getProperty(ObjectKey, params.object_key, flow_file);
+  if (params.object_key.empty() && (!flow_file->getAttribute("filename", params.object_key) || params.object_key.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+    return minifi::utils::nullopt;
+  }
+  logger_->log_debug("DeleteS3Object: Object Key [%s]", params.object_key);
+
+  context->getProperty(Version, params.version, flow_file);
+  logger_->log_debug("DeleteS3Object: Version [%s]", params.version);
+
+  params.bucket = common_properties.bucket;
+  params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  return params;
+}
+
 void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_debug("DeleteS3Object onTrigger");
   std::shared_ptr<core::FlowFile> flow_file = session->get();
@@ -65,31 +85,17 @@ void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
     return;
   }
 
-  std::string object_key;
-  context->getProperty(ObjectKey, object_key, flow_file);
-  if (object_key.empty() && (!flow_file->getAttribute("filename", object_key) || object_key.empty())) {
-    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+  auto params = buildDeleteS3RequestParams(context, flow_file, *common_properties);
+  if (!params) {
     session->transfer(flow_file, Failure);
     return;
   }
-  logger_->log_debug("DeleteS3Object: Object Key [%s]", object_key);
-
-  std::string version;
-  context->getProperty(Version, version, flow_file);
-  logger_->log_debug("DeleteS3Object: Version [%s]", version);
-
-  bool delete_succeeded = false;
-  {
-    std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
-    configureS3Wrapper(common_properties.value());
-    delete_succeeded = s3_wrapper_.deleteObject(common_properties->bucket, object_key, version);
-  }
 
-  if (delete_succeeded) {
-    logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", object_key, common_properties->bucket);
+  if (s3_wrapper_.deleteObject(*params)) {
+    logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", params->object_key, common_properties->bucket);
     session->transfer(flow_file, Success);
   } else {
-    logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", object_key, common_properties->bucket);
+    logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", params->object_key, common_properties->bucket);
     session->transfer(flow_file, Failure);
   }
 }
diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h
index d1ddaa5..6c459d6 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -65,6 +65,11 @@ class DeleteS3Object : public S3Processor {
   explicit DeleteS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
     : S3Processor(name, uuid, logging::LoggerFactory<DeleteS3Object>::getLogger(), std::move(s3_request_sender)) {
   }
+
+  minifi::utils::optional<aws::s3::DeleteObjectRequestParameters> buildDeleteS3RequestParams(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file,
+    const CommonProperties &common_properties) const;
 };
 
 REGISTER_RESOURCE(DeleteS3Object, "This Processor deletes FlowFiles on an Amazon S3 Bucket.");
diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index 8c77d53..0ee6ab4 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -65,6 +65,27 @@ void FetchS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &cont
   logger_->log_debug("FetchS3Object: RequesterPays [%s]", requester_pays_ ? "true" : "false");
 }
 
+minifi::utils::optional<aws::s3::GetObjectRequestParameters> FetchS3Object::buildFetchS3RequestParams(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file,
+    const CommonProperties &common_properties) const {
+  minifi::aws::s3::GetObjectRequestParameters get_object_params(common_properties.credentials, client_config_);
+  get_object_params.bucket = common_properties.bucket;
+  get_object_params.requester_pays = requester_pays_;
+
+  context->getProperty(ObjectKey, get_object_params.object_key, flow_file);
+  if (get_object_params.object_key.empty() && (!flow_file->getAttribute("filename", get_object_params.object_key) || get_object_params.object_key.empty())) {
+    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+    return minifi::utils::nullopt;
+  }
+  logger_->log_debug("FetchS3Object: Object Key [%s]", get_object_params.object_key);
+
+  context->getProperty(Version, get_object_params.version, flow_file);
+  logger_->log_debug("FetchS3Object: Version [%s]", get_object_params.version);
+  get_object_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  return get_object_params;
+}
+
 void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_debug("FetchS3Object onTrigger");
   std::shared_ptr<core::FlowFile> flow_file = session->get();
@@ -79,27 +100,14 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     return;
   }
 
-  minifi::aws::s3::GetObjectRequestParameters get_object_params;
-  get_object_params.bucket = common_properties->bucket;
-  get_object_params.requester_pays = requester_pays_;
-
-  context->getProperty(ObjectKey, get_object_params.object_key, flow_file);
-  if (get_object_params.object_key.empty() && (!flow_file->getAttribute("filename", get_object_params.object_key) || get_object_params.object_key.empty())) {
-    logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
+  auto get_object_params = buildFetchS3RequestParams(context, flow_file, *common_properties);
+  if (!get_object_params) {
     session->transfer(flow_file, Failure);
     return;
   }
-  logger_->log_debug("FetchS3Object: Object Key [%s]", get_object_params.object_key);
 
-  context->getProperty(Version, get_object_params.version, flow_file);
-  logger_->log_debug("FetchS3Object: Version [%s]", get_object_params.version);
-
-  WriteCallback callback(flow_file->getSize(), get_object_params, s3_wrapper_);
-  {
-    std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
-    configureS3Wrapper(common_properties.value());
-    session->write(flow_file, &callback);
-  }
+  WriteCallback callback(flow_file->getSize(), *get_object_params, s3_wrapper_);
+  session->write(flow_file, &callback);
 
   if (callback.result_) {
     auto putAttributeIfNotEmpty = [&](const std::string& attribute, const std::string& value) {
@@ -108,8 +116,8 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
       }
     };
 
-    logger_->log_debug("Successfully fetched S3 object %s from bucket %s", get_object_params.object_key, get_object_params.bucket);
-    session->putAttribute(flow_file, "s3.bucket", get_object_params.bucket);
+    logger_->log_debug("Successfully fetched S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
+    session->putAttribute(flow_file, "s3.bucket", get_object_params->bucket);
     session->putAttribute(flow_file, core::SpecialFlowAttribute::PATH, callback.result_->path);
     session->putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, callback.result_->absolute_path);
     session->putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, callback.result_->filename);
@@ -121,7 +129,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     putAttributeIfNotEmpty("s3.version", callback.result_->version);
     session->transfer(flow_file, Success);
   } else {
-    logger_->log_error("Failed to fetch S3 object %s from bucket %s", get_object_params.object_key, get_object_params.bucket);
+    logger_->log_error("Failed to fetch S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
     session->transfer(flow_file, Failure);
   }
 }
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 974be8c..ade81a0 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -95,6 +95,11 @@ class FetchS3Object : public S3Processor {
     : S3Processor(name, uuid, logging::LoggerFactory<FetchS3Object>::getLogger(), std::move(s3_request_sender)) {
   }
 
+  minifi::utils::optional<aws::s3::GetObjectRequestParameters> buildFetchS3RequestParams(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file,
+    const CommonProperties &common_properties) const;
+
   bool requester_pays_ = false;
 };
 
diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp
index ecae373..974358a 100644
--- a/extensions/aws/processors/ListS3.cpp
+++ b/extensions/aws/processors/ListS3.cpp
@@ -99,23 +99,24 @@ void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
   if (!common_properties) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid");
   }
-  configureS3Wrapper(common_properties.value());
-  list_request_params_.bucket = common_properties->bucket;
+  list_request_params_ = minifi::utils::make_unique<aws::s3::ListRequestParameters>(common_properties->credentials, client_config_);
+  list_request_params_->setClientConfig(common_properties->proxy, common_properties->endpoint_override_url);
+  list_request_params_->bucket = common_properties->bucket;
 
-  context->getProperty(Delimiter.getName(), list_request_params_.delimiter);
-  logger_->log_debug("ListS3: Delimiter [%s]", list_request_params_.delimiter);
+  context->getProperty(Delimiter.getName(), list_request_params_->delimiter);
+  logger_->log_debug("ListS3: Delimiter [%s]", list_request_params_->delimiter);
 
-  context->getProperty(Prefix.getName(), list_request_params_.prefix);
-  logger_->log_debug("ListS3: Prefix [%s]", list_request_params_.prefix);
+  context->getProperty(Prefix.getName(), list_request_params_->prefix);
+  logger_->log_debug("ListS3: Prefix [%s]", list_request_params_->prefix);
 
-  context->getProperty(UseVersions.getName(), list_request_params_.use_versions);
-  logger_->log_debug("ListS3: UseVersions [%s]", list_request_params_.use_versions ? "true" : "false");
+  context->getProperty(UseVersions.getName(), list_request_params_->use_versions);
+  logger_->log_debug("ListS3: UseVersions [%s]", list_request_params_->use_versions ? "true" : "false");
 
   std::string min_obj_age_str;
-  if (!context->getProperty(MinimumObjectAge.getName(), min_obj_age_str) || min_obj_age_str.empty() || !core::Property::getTimeMSFromString(min_obj_age_str, list_request_params_.min_object_age)) {
+  if (!context->getProperty(MinimumObjectAge.getName(), min_obj_age_str) || min_obj_age_str.empty() || !core::Property::getTimeMSFromString(min_obj_age_str, list_request_params_->min_object_age)) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Minimum Object Age missing or invalid");
   }
-  logger_->log_debug("S3Processor: Minimum Object Age [%llud]", min_obj_age_str, list_request_params_.min_object_age);
+  logger_->log_debug("S3Processor: Minimum Object Age [%llud]", min_obj_age_str, list_request_params_->min_object_age);
 
   context->getProperty(WriteObjectTags.getName(), write_object_tags_);
   logger_->log_debug("ListS3: WriteObjectTags [%s]", write_object_tags_ ? "true" : "false");
@@ -128,7 +129,6 @@ void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
 }
 
 void ListS3::writeObjectTags(
-    const std::string &bucket,
     const aws::s3::ListedObjectAttributes &object_attributes,
     core::ProcessSession &session,
     const std::shared_ptr<core::FlowFile> &flow_file) {
@@ -136,13 +136,17 @@ void ListS3::writeObjectTags(
     return;
   }
 
-  auto get_object_tags_result = s3_wrapper_.getObjectTags(bucket, object_attributes.filename, object_attributes.version);
+  aws::s3::GetObjectTagsParameters params(list_request_params_->credentials, list_request_params_->client_config);
+  params.bucket = list_request_params_->bucket;
+  params.object_key = object_attributes.filename;
+  params.version = object_attributes.version;
+  auto get_object_tags_result = s3_wrapper_.getObjectTags(params);
   if (get_object_tags_result) {
-    for (const auto& tag : get_object_tags_result.value()) {
+    for (const auto& tag : *get_object_tags_result) {
       session.putAttribute(flow_file, "s3.tag." + tag.first, tag.second);
     }
   } else {
-    logger_->log_warn("Failed to get object tags for object %s in bucket %s", object_attributes.filename, bucket);
+    logger_->log_warn("Failed to get object tags for object %s in bucket %s", object_attributes.filename, params.bucket);
   }
 }
 
@@ -154,8 +158,8 @@ void ListS3::writeUserMetadata(
     return;
   }
 
-  aws::s3::HeadObjectRequestParameters params;
-  params.bucket = list_request_params_.bucket;
+  aws::s3::HeadObjectRequestParameters params(list_request_params_->credentials, list_request_params_->client_config);
+  params.bucket = list_request_params_->bucket;
   params.object_key = object_attributes.filename;
   params.version = object_attributes.version;
   params.requester_pays = requester_pays_;
@@ -221,7 +225,7 @@ void ListS3::createNewFlowFile(
     core::ProcessSession &session,
     const aws::s3::ListedObjectAttributes &object_attributes) {
   auto flow_file = session.create();
-  session.putAttribute(flow_file, "s3.bucket", list_request_params_.bucket);
+  session.putAttribute(flow_file, "s3.bucket", list_request_params_->bucket);
   session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, object_attributes.filename);
   session.putAttribute(flow_file, "s3.etag", object_attributes.etag);
   session.putAttribute(flow_file, "s3.isLatest", object_attributes.is_latest ? "true" : "false");
@@ -231,7 +235,7 @@ void ListS3::createNewFlowFile(
   if (!object_attributes.version.empty()) {
     session.putAttribute(flow_file, "s3.version", object_attributes.version);
   }
-  writeObjectTags(list_request_params_.bucket, object_attributes, session, flow_file);
+  writeObjectTags(object_attributes, session, flow_file);
   writeUserMetadata(object_attributes, session, flow_file);
 
   session.transfer(flow_file, Success);
@@ -240,9 +244,9 @@ void ListS3::createNewFlowFile(
 void ListS3::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   logger_->log_debug("ListS3 onTrigger");
 
-  auto aws_results = s3_wrapper_.listBucket(list_request_params_);
+  auto aws_results = s3_wrapper_.listBucket(*list_request_params_);
   if (!aws_results) {
-    logger_->log_error("Failed to list S3 bucket %s", list_request_params_.bucket);
+    logger_->log_error("Failed to list S3 bucket %s", list_request_params_->bucket);
     context->yield();
     return;
   }
@@ -251,7 +255,7 @@ void ListS3::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
   auto latest_listing_state = stored_listing_state;
   std::size_t files_transferred = 0;
 
-  for (const auto& object_attributes : aws_results.value()) {
+  for (const auto& object_attributes : *aws_results) {
     if (stored_listing_state.wasObjectListedAlready(object_attributes)) {
       continue;
     }
@@ -265,7 +269,7 @@ void ListS3::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
   storeState(latest_listing_state);
 
   if (files_transferred == 0) {
-    logger_->log_debug("No new S3 objects were found in bucket %s to list", list_request_params_.bucket);
+    logger_->log_debug("No new S3 objects were found in bucket %s to list", list_request_params_->bucket);
     context->yield();
     return;
   }
diff --git a/extensions/aws/processors/ListS3.h b/extensions/aws/processors/ListS3.h
index 699b3e3..f48baef 100644
--- a/extensions/aws/processors/ListS3.h
+++ b/extensions/aws/processors/ListS3.h
@@ -81,7 +81,6 @@ class ListS3 : public S3Processor {
   static uint64_t getLatestListedKeyTimestamp(const std::unordered_map<std::string, std::string> &state);
 
   void writeObjectTags(
-    const std::string &bucket,
     const aws::s3::ListedObjectAttributes &object_attributes,
     core::ProcessSession &session,
     const std::shared_ptr<core::FlowFile> &flow_file);
@@ -95,7 +94,7 @@ class ListS3 : public S3Processor {
     core::ProcessSession &session,
     const aws::s3::ListedObjectAttributes &object_attributes);
 
-  aws::s3::ListRequestParameters list_request_params_;
+  std::unique_ptr<aws::s3::ListRequestParameters> list_request_params_;
   bool write_object_tags_ = false;
   bool write_user_metadata_ = false;
   bool requester_pays_ = false;
diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp
index b05f987..80a7483 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -167,7 +167,7 @@ std::string PutS3Object::parseAccessControlList(const std::string &comma_separat
 bool PutS3Object::setCannedAcl(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file,
-    aws::s3::PutObjectRequestParameters &put_s3_request_params) {
+    aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
   context->getProperty(CannedACL, put_s3_request_params.canned_acl, flow_file);
   if (!put_s3_request_params.canned_acl.empty() && CANNED_ACLS.find(put_s3_request_params.canned_acl) == CANNED_ACLS.end()) {
     logger_->log_error("Canned ACL is invalid!");
@@ -180,7 +180,7 @@ bool PutS3Object::setCannedAcl(
 bool PutS3Object::setAccessControl(
       const std::shared_ptr<core::ProcessContext> &context,
       const std::shared_ptr<core::FlowFile> &flow_file,
-      aws::s3::PutObjectRequestParameters &put_s3_request_params) {
+      aws::s3::PutObjectRequestParameters &put_s3_request_params) const {
   std::string value;
   if (context->getProperty(FullControlUserList, value, flow_file) && !value.empty()) {
     put_s3_request_params.fullcontrol_user_list = parseAccessControlList(value);
@@ -205,8 +205,9 @@ bool PutS3Object::setAccessControl(
 minifi::utils::optional<aws::s3::PutObjectRequestParameters> PutS3Object::buildPutS3RequestParams(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file,
-    const CommonProperties &common_properties) {
-  aws::s3::PutObjectRequestParameters params;
+    const CommonProperties &common_properties) const {
+  aws::s3::PutObjectRequestParameters params(common_properties.credentials, client_config_);
+  params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
   params.bucket = common_properties.bucket;
   params.user_metadata_map = user_metadata_map_;
   params.server_side_encryption = server_side_encryption_;
@@ -232,7 +233,7 @@ void PutS3Object::setAttributes(
     const std::shared_ptr<core::ProcessSession> &session,
     const std::shared_ptr<core::FlowFile> &flow_file,
     const aws::s3::PutObjectRequestParameters &put_s3_request_params,
-    const minifi::aws::s3::PutObjectResult &put_object_result) {
+    const minifi::aws::s3::PutObjectResult &put_object_result) const {
   session->putAttribute(flow_file, "s3.bucket", put_s3_request_params.bucket);
   session->putAttribute(flow_file, "s3.key", put_s3_request_params.object_key);
   session->putAttribute(flow_file, "s3.contenttype", put_s3_request_params.content_type);
@@ -268,24 +269,19 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-  auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, common_properties.value());
+  auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
   if (!put_s3_request_params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), put_s3_request_params.value(), s3_wrapper_);
-  {
-    std::lock_guard<std::mutex> lock(s3_wrapper_mutex_);
-    configureS3Wrapper(common_properties.value());
-    session->read(flow_file, &callback);
-  }
-
+  PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
+  session->read(flow_file, &callback);
   if (callback.result_ == minifi::utils::nullopt) {
     logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
     session->transfer(flow_file, Failure);
   } else {
-    setAttributes(session, flow_file, put_s3_request_params.value(), callback.result_.value());
+    setAttributes(session, flow_file, *put_s3_request_params, *callback.result_);
     logger_->log_debug("Successfully uploaded S3 object '%s' to bucket '%s'", put_s3_request_params->object_key, put_s3_request_params->bucket);
     session->transfer(flow_file, Success);
   }
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index e9cf600..ad3072a 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -126,17 +126,17 @@ class PutS3Object : public S3Processor {
 
   void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
   std::string parseAccessControlList(const std::string &comma_separated_list) const;
-  bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params);
-  bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params);
+  bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const;
+  bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file, aws::s3::PutObjectRequestParameters &put_s3_request_params) const;
   void setAttributes(
     const std::shared_ptr<core::ProcessSession> &session,
     const std::shared_ptr<core::FlowFile> &flow_file,
     const aws::s3::PutObjectRequestParameters &put_s3_request_params,
-    const minifi::aws::s3::PutObjectResult &put_object_result);
+    const minifi::aws::s3::PutObjectResult &put_object_result) const;
   minifi::utils::optional<aws::s3::PutObjectRequestParameters> buildPutS3RequestParams(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file,
-    const CommonProperties &common_properties);
+    const CommonProperties &common_properties) const;
 
   std::string user_metadata_;
   std::map<std::string, std::string> user_metadata_map_;
diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp
index 617dc2b..52bdc2f 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -198,16 +198,15 @@ void S3Processor::onSchedule(const std::shared_ptr<core::ProcessContext>& contex
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid");
   }
 
-  if (!context->getProperty(Region.getName(), value) || value.empty() || REGIONS.count(value) == 0) {
+  if (!context->getProperty(Region.getName(), client_config_.region) || client_config_.region.empty() || REGIONS.count(client_config_.region) == 0) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or invalid");
   }
-  s3_wrapper_.setRegion(value);
-  logger_->log_debug("S3Processor: Region [%s]", value);
+  logger_->log_debug("S3Processor: Region [%s]", client_config_.region);
 
   uint64_t timeout_val;
   if (context->getProperty(CommunicationsTimeout.getName(), value) && !value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
-    s3_wrapper_.setTimeout(timeout_val);
     logger_->log_debug("S3Processor: Communications Timeout [%llu]", timeout_val);
+    client_config_.connectTimeoutMs = gsl::narrow<int64_t>(timeout_val);
   } else {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
   }
@@ -244,16 +243,6 @@ minifi::utils::optional<CommonProperties> S3Processor::getCommonELSupportedPrope
   return properties;
 }
 
-void S3Processor::configureS3Wrapper(const CommonProperties &common_properties) {
-  s3_wrapper_.setCredentials(common_properties.credentials);
-  if (!common_properties.proxy.host.empty()) {
-    s3_wrapper_.setProxy(common_properties.proxy);
-  }
-  if (!common_properties.endpoint_override_url.empty()) {
-    s3_wrapper_.setEndpointOverrideUrl(common_properties.endpoint_override_url);
-  }
-}
-
 }  // namespace processors
 }  // namespace aws
 }  // namespace minifi
diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/S3Processor.h
index 0b76078..55f26db 100644
--- a/extensions/aws/processors/S3Processor.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -111,11 +111,10 @@ class S3Processor : public core::Processor {
   minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
   minifi::utils::optional<aws::s3::ProxyOptions> getProxy(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
   minifi::utils::optional<CommonProperties> getCommonELSupportedProperties(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
-  void configureS3Wrapper(const CommonProperties &common_properties);
 
   std::shared_ptr<logging::Logger> logger_;
   aws::s3::S3Wrapper s3_wrapper_;
-  std::mutex s3_wrapper_mutex_;
+  Aws::Client::ClientConfiguration client_config_;
 };
 
 }  // namespace processors
diff --git a/extensions/aws/s3/S3ClientRequestSender.cpp b/extensions/aws/s3/S3ClientRequestSender.cpp
index 62cbf50..e8c0084 100644
--- a/extensions/aws/s3/S3ClientRequestSender.cpp
+++ b/extensions/aws/s3/S3ClientRequestSender.cpp
@@ -28,8 +28,11 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::sendPutObjectRequest(
+    const Aws::S3::Model::PutObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.PutObject(request);
 
   if (outcome.IsSuccess()) {
@@ -41,8 +44,11 @@ minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3ClientRequestSender::
   }
 }
 
-bool S3ClientRequestSender::sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+bool S3ClientRequestSender::sendDeleteObjectRequest(
+    const Aws::S3::Model::DeleteObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
 
   if (outcome.IsSuccess()) {
@@ -57,8 +63,11 @@ bool S3ClientRequestSender::sendDeleteObjectRequest(const Aws::S3::Model::Delete
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::sendGetObjectRequest(
+    const Aws::S3::Model::GetObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.GetObject(request);
 
   if (outcome.IsSuccess()) {
@@ -70,8 +79,11 @@ minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3ClientRequestSender::
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSender::sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSender::sendListObjectsRequest(
+    const Aws::S3::Model::ListObjectsV2Request& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.ListObjectsV2(request);
 
   if (outcome.IsSuccess()) {
@@ -83,8 +95,11 @@ minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> S3ClientRequestSend
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientRequestSender::sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientRequestSender::sendListVersionsRequest(
+    const Aws::S3::Model::ListObjectVersionsRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.ListObjectVersions(request);
 
   if (outcome.IsSuccess()) {
@@ -96,8 +111,11 @@ minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> S3ClientReques
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestSender::sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestSender::sendGetObjectTaggingRequest(
+    const Aws::S3::Model::GetObjectTaggingRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.GetObjectTagging(request);
 
   if (outcome.IsSuccess()) {
@@ -109,8 +127,11 @@ minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> S3ClientRequestS
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::HeadObjectResult> S3ClientRequestSender::sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
+minifi::utils::optional<Aws::S3::Model::HeadObjectResult> S3ClientRequestSender::sendHeadObjectRequest(
+    const Aws::S3::Model::HeadObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) {
+  Aws::S3::S3Client s3_client(credentials, client_config);
   auto outcome = s3_client.HeadObject(request);
 
   if (outcome.IsSuccess()) {
diff --git a/extensions/aws/s3/S3ClientRequestSender.h b/extensions/aws/s3/S3ClientRequestSender.h
index be9dd0c..f16293f 100644
--- a/extensions/aws/s3/S3ClientRequestSender.h
+++ b/extensions/aws/s3/S3ClientRequestSender.h
@@ -30,13 +30,34 @@ namespace s3 {
 
 class S3ClientRequestSender : public S3RequestSender {
  public:
-  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override;
-  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override;
-  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override;
-  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) override;
-  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) override;
-  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) override;
-  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) override;
+  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(
+    const Aws::S3::Model::PutObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  bool sendDeleteObjectRequest(
+    const Aws::S3::Model::DeleteObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(
+    const Aws::S3::Model::GetObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(
+    const Aws::S3::Model::ListObjectsV2Request& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(
+    const Aws::S3::Model::ListObjectVersionsRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(
+    const Aws::S3::Model::GetObjectTaggingRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
+  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(
+    const Aws::S3::Model::HeadObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) override;
 };
 
 }  // namespace s3
diff --git a/extensions/aws/s3/S3RequestSender.cpp b/extensions/aws/s3/S3RequestSender.cpp
deleted file mode 100644
index 859a317..0000000
--- a/extensions/aws/s3/S3RequestSender.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * @file S3RequestSender.cpp
- * S3RequestSender class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "S3RequestSender.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace aws {
-namespace s3 {
-
-
-void S3RequestSender::setCredentials(const Aws::Auth::AWSCredentials& cred) {
-  logger_->log_debug("Setting new AWS credentials");
-  credentials_ = cred;
-}
-
-void S3RequestSender::setRegion(const Aws::String& region) {
-  logger_->log_debug("Setting new AWS region [%s]", region);
-  client_config_.region = region;
-}
-
-void S3RequestSender::setTimeout(uint64_t timeout) {
-  logger_->log_debug("Setting AWS client connection timeout [%llu]", timeout);
-  client_config_.connectTimeoutMs = timeout;
-}
-
-void S3RequestSender::setEndpointOverrideUrl(const Aws::String& url) {
-  logger_->log_debug("Setting AWS endpoint url [%s]", url);
-  client_config_.endpointOverride = url;
-}
-
-void S3RequestSender::setProxy(const ProxyOptions& proxy) {
-  logger_->log_debug("Setting AWS client proxy host [%s] port [%lu]", proxy.host, proxy.port);
-  client_config_.proxyHost = proxy.host;
-  client_config_.proxyPort = proxy.port;
-  client_config_.proxyUserName = proxy.username;
-  client_config_.proxyPassword = proxy.password;
-}
-
-}  // namespace s3
-}  // namespace aws
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/extensions/aws/s3/S3RequestSender.h b/extensions/aws/s3/S3RequestSender.h
index d55be76..09a528d 100644
--- a/extensions/aws/s3/S3RequestSender.h
+++ b/extensions/aws/s3/S3RequestSender.h
@@ -59,25 +59,38 @@ struct ProxyOptions {
 
 class S3RequestSender {
  public:
-  virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) = 0;
-  virtual bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) = 0;
-  virtual minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(
+    const Aws::S3::Model::PutObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual bool sendDeleteObjectRequest(
+    const Aws::S3::Model::DeleteObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(
+    const Aws::S3::Model::GetObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(
+    const Aws::S3::Model::ListObjectsV2Request& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(
+    const Aws::S3::Model::ListObjectVersionsRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(
+    const Aws::S3::Model::GetObjectTaggingRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
+  virtual minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(
+    const Aws::S3::Model::HeadObjectRequest& request,
+    const Aws::Auth::AWSCredentials& credentials,
+    const Aws::Client::ClientConfiguration& client_config) = 0;
   virtual ~S3RequestSender() = default;
 
-  void setCredentials(const Aws::Auth::AWSCredentials& cred);
-  void setRegion(const Aws::String& region);
-  void setTimeout(uint64_t timeout);
-  void setEndpointOverrideUrl(const Aws::String& url);
-  void setProxy(const ProxyOptions& proxy);
-
  protected:
   const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
-  Aws::Client::ClientConfiguration client_config_;
-  Aws::Auth::AWSCredentials credentials_;
   std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3RequestSender>::getLogger()};
 };
 
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index d248cfb..a7c375a 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -48,26 +48,6 @@ S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique<S3ClientRequ
 S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender) : request_sender_(std::move(request_sender)) {
 }
 
-void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
-  request_sender_->setCredentials(cred);
-}
-
-void S3Wrapper::setRegion(const Aws::String& region) {
-  request_sender_->setRegion(region);
-}
-
-void S3Wrapper::setTimeout(uint64_t timeout) {
-  request_sender_->setTimeout(timeout);
-}
-
-void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
-  request_sender_->setEndpointOverrideUrl(url);
-}
-
-void S3Wrapper::setProxy(const ProxyOptions& proxy) {
-  request_sender_->setProxy(proxy);
-}
-
 void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const {
   if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end())
     return;
@@ -115,7 +95,7 @@ minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectReq
   request.SetGrantWriteACP(put_object_params.write_acl_user_list);
   setCannedAcl(request, put_object_params.canned_acl);
 
-  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config);
   if (!aws_result) {
     return minifi::utils::nullopt;
   }
@@ -132,14 +112,14 @@ minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectReq
   return result;
 }
 
-bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) {
+bool S3Wrapper::deleteObject(const DeleteObjectRequestParameters& params) {
   Aws::S3::Model::DeleteObjectRequest request;
-  request.SetBucket(bucket);
-  request.SetKey(object_key);
-  if (!version.empty()) {
-    request.SetVersionId(version);
+  request.SetBucket(params.bucket);
+  request.SetKey(params.object_key);
+  if (!params.version.empty()) {
+    request.SetVersionId(params.version);
   }
-  return request_sender_->sendDeleteObjectRequest(request);
+  return request_sender_->sendDeleteObjectRequest(request, params.credentials, params.client_config);
 }
 
 int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output) {
@@ -161,7 +141,7 @@ int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_si
 
 minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) {
   auto request = createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
-  auto aws_result = request_sender_->sendGetObjectRequest(request);
+  auto aws_result = request_sender_->sendGetObjectRequest(request, get_object_params.credentials, get_object_params.client_config);
   if (!aws_result) {
     return minifi::utils::nullopt;
   }
@@ -212,7 +192,7 @@ minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listVers
   std::vector<ListedObjectAttributes> attribute_list;
   nonstd::optional_lite::optional<Aws::S3::Model::ListObjectVersionsResult> aws_result;
   do {
-    aws_result = request_sender_->sendListVersionsRequest(request);
+    aws_result = request_sender_->sendListVersionsRequest(request, params.credentials, params.client_config);
     if (!aws_result) {
       return minifi::utils::nullopt;
     }
@@ -233,7 +213,7 @@ minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listObje
   std::vector<ListedObjectAttributes> attribute_list;
   nonstd::optional_lite::optional<Aws::S3::Model::ListObjectsV2Result> aws_result;
   do {
-    aws_result = request_sender_->sendListObjectsRequest(request);
+    aws_result = request_sender_->sendListObjectsRequest(request, params.credentials, params.client_config);
     if (!aws_result) {
       return minifi::utils::nullopt;
     }
@@ -256,14 +236,14 @@ minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listBuck
   return listObjects(params);
 }
 
-minifi::utils::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const std::string& bucket, const std::string& object_key, const std::string& version) {
+minifi::utils::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
   Aws::S3::Model::GetObjectTaggingRequest request;
-  request.SetBucket(bucket);
-  request.SetKey(object_key);
-  if (!version.empty()) {
-    request.SetVersionId(version);
+  request.SetBucket(params.bucket);
+  request.SetKey(params.object_key);
+  if (!params.version.empty()) {
+    request.SetVersionId(params.version);
   }
-  auto aws_result = request_sender_->sendGetObjectTaggingRequest(request);
+  auto aws_result = request_sender_->sendGetObjectTaggingRequest(request, params.credentials, params.client_config);
   if (!aws_result) {
     return minifi::utils::nullopt;
   }
@@ -276,7 +256,7 @@ minifi::utils::optional<std::map<std::string, std::string>> S3Wrapper::getObject
 
 minifi::utils::optional<HeadObjectResult> S3Wrapper::headObject(const HeadObjectRequestParameters& head_object_params) {
   auto request = createFetchObjectRequest<Aws::S3::Model::HeadObjectRequest>(head_object_params);
-  auto aws_result = request_sender_->sendHeadObjectRequest(request);
+  auto aws_result = request_sender_->sendHeadObjectRequest(request, head_object_params.credentials, head_object_params.client_config);
   if (!aws_result) {
     return minifi::utils::nullopt;
   }
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 239b3b2..b59f5d2 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -99,7 +99,25 @@ struct PutObjectResult {
   std::string ssealgorithm;
 };
 
-struct PutObjectRequestParameters {
+struct RequestParameters {
+  RequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : credentials(creds)
+    , client_config(config) {}
+  Aws::Auth::AWSCredentials credentials;
+  Aws::Client::ClientConfiguration client_config;
+
+  void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& endpoint_override_url) {
+    client_config.proxyHost = proxy.host;
+    client_config.proxyPort = proxy.port;
+    client_config.proxyUserName = proxy.username;
+    client_config.proxyPassword = proxy.password;
+    client_config.endpointOverride = endpoint_override_url;
+  }
+};
+
+struct PutObjectRequestParameters : public RequestParameters {
+  PutObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
   std::string bucket;
   std::string object_key;
   std::string storage_class;
@@ -113,7 +131,17 @@ struct PutObjectRequestParameters {
   std::string canned_acl;
 };
 
-struct GetObjectRequestParameters {
+struct DeleteObjectRequestParameters : public RequestParameters {
+  DeleteObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string object_key;
+  std::string version;
+};
+
+struct GetObjectRequestParameters : public RequestParameters {
+  GetObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
   std::string bucket;
   std::string object_key;
   std::string version;
@@ -138,7 +166,9 @@ struct GetObjectResult : public HeadObjectResult {
   int64_t write_size = 0;
 };
 
-struct ListRequestParameters {
+struct ListRequestParameters : public RequestParameters {
+  ListRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
   std::string bucket;
   std::string delimiter;
   std::string prefix;
@@ -157,23 +187,18 @@ struct ListedObjectAttributes {
 };
 
 using HeadObjectRequestParameters = GetObjectRequestParameters;
+using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
 class S3Wrapper {
  public:
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  void setCredentials(const Aws::Auth::AWSCredentials& cred);
-  void setRegion(const Aws::String& region);
-  void setTimeout(uint64_t timeout);
-  void setEndpointOverrideUrl(const Aws::String& url);
-  void setProxy(const ProxyOptions& proxy);
-
   minifi::utils::optional<PutObjectResult> putObject(const PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream> data_stream);
-  bool deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version = "");
+  bool deleteObject(const DeleteObjectRequestParameters& options);
   minifi::utils::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& fetched_body);
   minifi::utils::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
-  minifi::utils::optional<std::map<std::string, std::string>> getObjectTags(const std::string& bucket, const std::string& object_key, const std::string& version = "");
+  minifi::utils::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   minifi::utils::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
 
   virtual ~S3Wrapper() = default;
diff --git a/libminifi/test/aws-tests/ListS3Tests.cpp b/libminifi/test/aws-tests/ListS3Tests.cpp
index 56d29c4..583a0b8 100644
--- a/libminifi/test/aws-tests/ListS3Tests.cpp
+++ b/libminifi/test/aws-tests/ListS3Tests.cpp
@@ -81,6 +81,9 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test proxy setting", "[awsS3Proxy]") {
 
 TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without versioning", "[awsS3ListObjects]") {
   setRequiredProperties();
+  plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
+  plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
+  plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
   test_controller.runSession(plan, true);
 
   for (std::size_t i = 0; i < S3_OBJECT_COUNT; ++i) {
@@ -97,6 +100,9 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without versioning", "[awsS3L
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
   REQUIRE(!mock_s3_request_sender_ptr->list_object_request.ContinuationTokenHasBeenSet());
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
 }
 
 TEST_CASE_METHOD(ListS3TestsFixture, "Test listing with versioning", "[awsS3ListVersions]") {
@@ -163,15 +169,26 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test minimum age property handling with ve
 
 TEST_CASE_METHOD(ListS3TestsFixture, "Test write object tags", "[awsS3ListTags]") {
   setRequiredProperties();
+  plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
+  plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
+  plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
   plan->setProperty(s3_processor, "Write Object Tags", "true");
   test_controller.runSession(plan, true);
   for (const auto& tag : S3_OBJECT_TAGS) {
     REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag." + tag.first + " value:" + tag.second) == S3_OBJECT_COUNT);
   }
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
 }
 
 TEST_CASE_METHOD(ListS3TestsFixture, "Test write user metadata", "[awsS3ListMetadata]") {
   setRequiredProperties();
+  plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
+  plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
+  plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
   plan->setProperty(s3_processor, "Write User Metadata", "true");
   plan->setProperty(s3_processor, "Requester Pays", "true");
   test_controller.runSession(plan, true);
@@ -179,6 +196,11 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test write user metadata", "[awsS3ListMeta
     REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata." + metadata.first + " value:" + metadata.second) == S3_OBJECT_COUNT);
   }
   REQUIRE(mock_s3_request_sender_ptr->head_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
+  REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
+  REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
 }
 
 TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated listing without versioning", "[awsS3ListObjects]") {
diff --git a/libminifi/test/aws-tests/MockS3RequestSender.h b/libminifi/test/aws-tests/MockS3RequestSender.h
index 6c64ae5..01c6db7 100644
--- a/libminifi/test/aws-tests/MockS3RequestSender.h
+++ b/libminifi/test/aws-tests/MockS3RequestSender.h
@@ -89,8 +89,13 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     }
   }
 
-  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) override {
+  minifi::utils::optional<Aws::S3::Model::PutObjectResult> sendPutObjectRequest(
+      const Aws::S3::Model::PutObjectRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     put_object_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
 
     Aws::S3::Model::PutObjectResult put_s3_result;
     if (!return_empty_result_) {
@@ -102,13 +107,23 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     return put_s3_result;
   }
 
-  bool sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) override {
+  bool sendDeleteObjectRequest(
+      const Aws::S3::Model::DeleteObjectRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     delete_object_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
     return delete_object_result_;
   }
 
-  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) override {
+  minifi::utils::optional<Aws::S3::Model::GetObjectResult> sendGetObjectRequest(
+      const Aws::S3::Model::GetObjectRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     get_object_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
 
     Aws::S3::Model::GetObjectResult get_s3_result;
     if (!return_empty_result_) {
@@ -124,8 +139,13 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     return minifi::utils::make_optional(std::move(get_s3_result));
   }
 
-  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(const Aws::S3::Model::ListObjectsV2Request& request) override {
+  minifi::utils::optional<Aws::S3::Model::ListObjectsV2Result> sendListObjectsRequest(
+      const Aws::S3::Model::ListObjectsV2Request& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     list_object_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
 
     Aws::S3::Model::ListObjectsV2Result list_object_result;
     if (!is_listing_truncated_) {
@@ -150,8 +170,13 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     return list_object_result;
   }
 
-  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(const Aws::S3::Model::ListObjectVersionsRequest& request) override {
+  minifi::utils::optional<Aws::S3::Model::ListObjectVersionsResult> sendListVersionsRequest(
+      const Aws::S3::Model::ListObjectVersionsRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     list_version_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
 
     Aws::S3::Model::ListObjectVersionsResult list_version_result;
     if (!is_listing_truncated_) {
@@ -177,8 +202,13 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     return list_version_result;
   }
 
-  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(const Aws::S3::Model::GetObjectTaggingRequest& request) override {
+  minifi::utils::optional<Aws::S3::Model::GetObjectTaggingResult> sendGetObjectTaggingRequest(
+      const Aws::S3::Model::GetObjectTaggingRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     get_object_tagging_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
     Aws::S3::Model::GetObjectTaggingResult result;
     for (const auto& tag_pair : S3_OBJECT_TAGS) {
       Aws::S3::Model::Tag tag;
@@ -189,8 +219,13 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
     return result;
   }
 
-  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(const Aws::S3::Model::HeadObjectRequest& request) override {
+  minifi::utils::optional<Aws::S3::Model::HeadObjectResult> sendHeadObjectRequest(
+      const Aws::S3::Model::HeadObjectRequest& request,
+      const Aws::Auth::AWSCredentials& credentials,
+      const Aws::Client::ClientConfiguration& client_config) override {
     head_object_request = request;
+    credentials_ = credentials;
+    client_config_ = client_config;
 
     Aws::S3::Model::HeadObjectResult head_s3_result;
     if (!return_empty_result_) {
@@ -252,4 +287,6 @@ class MockS3RequestSender : public minifi::aws::s3::S3RequestSender {
   bool delete_object_result_ = true;
   bool return_empty_result_ = false;
   bool is_listing_truncated_ = false;
+  Aws::Auth::AWSCredentials credentials_;
+  Aws::Client::ClientConfiguration client_config_;
 };