You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/10/06 15:48:44 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1643 Add support for managed identity on Azure

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b72bdd  MINIFICPP-1643 Add support for managed identity on Azure
5b72bdd is described below

commit 5b72bddfd04da510cd4cfaa1734e021732dfe03e
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Sep 13 16:21:12 2021 +0200

    MINIFICPP-1643 Add support for managed identity on Azure
    
    - Add default filename value for blob
    - Add Managed Identities to Azure Data Lake Storage
    - Add endpoint suffix option to managed identity credentials
    - Fix credentials priorities and documentation
    - Refactor blob storage
    - Add result codes to getCredentialsFromControllerService
    - Move Azure credentials member to client implementation classes
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1178
---
 CONTROLLERS.md                                     |  35 ++--
 PROCESSORS.md                                      |   7 +-
 .../controllerservices/AWSCredentialsService.cpp   |   2 -
 extensions/azure/CMakeLists.txt                    |   2 +-
 .../AzureStorageCredentialsService.cpp             |  51 +++--
 .../AzureStorageCredentialsService.h               |  19 +-
 .../azure/processors/AzureStorageProcessorBase.cpp |  11 +-
 .../azure/processors/AzureStorageProcessorBase.h   |  13 +-
 .../azure/processors/PutAzureBlobStorage.cpp       | 153 ++++++++++-----
 extensions/azure/processors/PutAzureBlobStorage.h  |  40 ++--
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  15 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |   2 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  60 ++----
 extensions/azure/storage/AzureBlobStorage.h        |  34 ++--
 .../azure/storage/AzureBlobStorageClient.cpp       |  65 ++++++
 ...AzureBlobStorage.h => AzureBlobStorageClient.h} |  35 ++--
 .../azure/storage/AzureDataLakeStorageClient.cpp   |  28 ++-
 .../azure/storage/AzureDataLakeStorageClient.h     |   5 +-
 .../azure/storage/AzureStorageCredentials.cpp      | 109 +++++++++++
 extensions/azure/storage/AzureStorageCredentials.h |  75 +++----
 extensions/azure/storage/BlobStorage.h             |  63 ------
 ...DataLakeStorageClient.h => BlobStorageClient.h} |  30 +--
 extensions/azure/storage/DataLakeStorageClient.h   |   9 +-
 .../pdh/tests/PerformanceDataCounterTests.cpp      |   2 +-
 extensions/windows-event-log/wel/JSONUtils.cpp     |   2 +-
 .../test/azure-tests/PutAzureBlobStorageTests.cpp  | 218 ++++++++++++++-------
 .../azure-tests/PutAzureDataLakeStorageTests.cpp   |  91 ++++++---
 27 files changed, 716 insertions(+), 460 deletions(-)

diff --git a/CONTROLLERS.md b/CONTROLLERS.md
index 0896f76..42ab33e 100644
--- a/CONTROLLERS.md
+++ b/CONTROLLERS.md
@@ -31,15 +31,14 @@ controller service so that AWS credentials can be managed and controlled in a ce
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other
-properties (not in bold) are considered optional. The table also indicates any
-default values, and whether a property supports the NiFi Expression Language.
+properties (not in bold) are considered optional.
 
-| Name | Default Value | Allowable Values | Expression Language Supported? | Description |
-| - | - | - | - | - |
-|**Use Default Credentials**|false||No|If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.|
-|Access Key|||Yes|Specifies the AWS Access Key|
-|Secret Key|||Yes|Specifies the AWS Secret Key|
-|Credentials File|||No|Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey|
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Use Default Credentials**|false||If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.|
+|Access Key|||Specifies the AWS Access Key|
+|Secret Key|||Specifies the AWS Secret Key|
+|Credentials File|||Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey|
 
 ## AzureStorageCredentialsService
 
@@ -51,13 +50,13 @@ controller service so that Azure storage credentials can be managed and controll
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other
-properties (not in bold) are considered optional. The table also indicates any
-default values, and whether a property supports the NiFi Expression Language.
-
-| Name | Default Value | Allowable Values | Expression Language Supported? | Description |
-| - | - | - | - | - |
-|Storage Account Name||||The storage account name.|
-|Storage Account Key||||The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.|
-|SAS Token||||Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.|
-|Common Storage Account Endpoint Suffix||||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).|
-|Connection String||||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.|
+properties (not in bold) are considered optional.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Storage Account Name|||The storage account name.|
+|Storage Account Key|||The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.|
+|SAS Token|||Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.|
+|Common Storage Account Endpoint Suffix|||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).|
+|Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.|
+|**Use Managed Identity Credentials**|false||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.|
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 47436d0..ebb540c 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1224,12 +1224,13 @@ In the list below, the names of required properties appear in bold. Any other pr
 |**Container Name**|||Name of the Azure storage container. In case of PutAzureBlobStorage processor, container can be created if it does not exist.<br/>**Supports Expression Language: true**|
 |Storage Account Name|||The storage account name.<br/>**Supports Expression Language: true**|
 |Storage Account Key|||The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.<br/>**Supports Expression Language: true**|
-|SAS Token|||Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.<br/>**Supports Expression Language: true**|
+|SAS Token|||Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.<br/>**Supports Expression Language: true**|
 |Common Storage Account Endpoint Suffix|||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).<br/>**Supports Expression Language: true**|
-|Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.<br/>**Supports Expression Language: true**|
+|Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.<br/>**Supports Expression Language: true**|
 |Azure Storage Credentials Service|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
-|**Blob**|||The filename of the blob.<br/>**Supports Expression Language: true**|
+|**Blob**|||The filename of the blob. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
 |**Create Container**|false||Specifies whether to check if the container exists and to automatically create it if it does not. Permission to list containers is required. If false, this check is not made, but the Put operation will fail if the container does not exist.|
+|**Use Managed Identity Credentials**|false||If true Managed Identity credentials will be used together with the Storage Account Name for authentication.|
 
 ### Relationships
 
diff --git a/extensions/aws/controllerservices/AWSCredentialsService.cpp b/extensions/aws/controllerservices/AWSCredentialsService.cpp
index b44e59c..ec5369d 100644
--- a/extensions/aws/controllerservices/AWSCredentialsService.cpp
+++ b/extensions/aws/controllerservices/AWSCredentialsService.cpp
@@ -38,13 +38,11 @@ const core::Property AWSCredentialsService::UseDefaultCredentials(
 
 const core::Property AWSCredentialsService::AccessKey(
     core::PropertyBuilder::createProperty("Access Key")->withDescription("Specifies the AWS Access Key.")
-    ->supportsExpressionLanguage(true)
     ->build());
 
 const core::Property AWSCredentialsService::SecretKey(
     core::PropertyBuilder::createProperty("Secret Key")
     ->withDescription("Specifies the AWS Secret Key.")
-    ->supportsExpressionLanguage(true)
     ->build());
 
 const core::Property AWSCredentialsService::CredentialsFile(
diff --git a/extensions/azure/CMakeLists.txt b/extensions/azure/CMakeLists.txt
index 9304743..1833007 100644
--- a/extensions/azure/CMakeLists.txt
+++ b/extensions/azure/CMakeLists.txt
@@ -32,7 +32,7 @@ target_include_directories(minifi-azure BEFORE PRIVATE ${CMAKE_SOURCE_DIR}/exten
 
 target_link_libraries(minifi-azure ${LIBMINIFI} Threads::Threads)
 target_link_libraries(minifi-azure CURL::libcurl LibXml2::LibXml2)
-target_link_libraries(minifi-azure AZURE::azure-storage-files-datalake AZURE::azure-storage-blobs AZURE::azure-storage-common AZURE::azure-core)
+target_link_libraries(minifi-azure AZURE::azure-storage-files-datalake AZURE::azure-storage-blobs AZURE::azure-storage-common AZURE::azure-core AZURE::azure-identity)
 
 if (WIN32)
   target_link_libraries(minifi-azure crypt32.lib bcrypt.lib)
diff --git a/extensions/azure/controllerservices/AzureStorageCredentialsService.cpp b/extensions/azure/controllerservices/AzureStorageCredentialsService.cpp
index 2796951..0acebea 100644
--- a/extensions/azure/controllerservices/AzureStorageCredentialsService.cpp
+++ b/extensions/azure/controllerservices/AzureStorageCredentialsService.cpp
@@ -22,12 +22,7 @@
 
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace controllers {
+namespace org::apache::nifi::minifi::azure::controllers {
 
 const core::Property AzureStorageCredentialsService::StorageAccountName(
     core::PropertyBuilder::createProperty("Storage Account Name")
@@ -40,7 +35,7 @@ const core::Property AzureStorageCredentialsService::StorageAccountKey(
       ->build());
 const core::Property AzureStorageCredentialsService::SASToken(
     core::PropertyBuilder::createProperty("SAS Token")
-      ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.")
+      ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.")
       ->build());
 const core::Property AzureStorageCredentialsService::CommonStorageAccountEndpointSuffix(
     core::PropertyBuilder::createProperty("Common Storage Account Endpoint Suffix")
@@ -49,26 +44,42 @@ const core::Property AzureStorageCredentialsService::CommonStorageAccountEndpoin
       ->build());
 const core::Property AzureStorageCredentialsService::ConnectionString(
   core::PropertyBuilder::createProperty("Connection String")
-    ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties.")
+    ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.")
+    ->build());
+const core::Property AzureStorageCredentialsService::UseManagedIdentityCredentials(
+  core::PropertyBuilder::createProperty("Use Managed Identity Credentials")
+    ->withDescription("If true Managed Identity credentials will be used together with the Storage Account Name for authentication.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
     ->build());
 
 void AzureStorageCredentialsService::initialize() {
-  setSupportedProperties({StorageAccountName, StorageAccountKey, SASToken, CommonStorageAccountEndpointSuffix, ConnectionString});
+  setSupportedProperties({StorageAccountName, StorageAccountKey, SASToken, CommonStorageAccountEndpointSuffix, ConnectionString, UseManagedIdentityCredentials});
 }
 
 void AzureStorageCredentialsService::onEnable() {
-  getProperty(StorageAccountName.getName(), credentials_.storage_account_name);
-  getProperty(StorageAccountKey.getName(), credentials_.storage_account_key);
-  getProperty(SASToken.getName(), credentials_.sas_token);
-  getProperty(CommonStorageAccountEndpointSuffix.getName(), credentials_.endpoint_suffix);
-  getProperty(ConnectionString.getName(), credentials_.connection_string);
+  std::string value;
+  if (getProperty(StorageAccountName.getName(), value)) {
+    credentials_.setStorageAccountName(value);
+  }
+  if (getProperty(StorageAccountKey.getName(), value)) {
+    credentials_.setStorageAccountKey(value);
+  }
+  if (getProperty(SASToken.getName(), value)) {
+    credentials_.setSasToken(value);
+  }
+  if (getProperty(CommonStorageAccountEndpointSuffix.getName(), value)) {
+    credentials_.setEndpontSuffix(value);
+  }
+  if (getProperty(ConnectionString.getName(), value)) {
+    credentials_.setConnectionString(value);
+  }
+  bool use_managed_identity_credentials = false;
+  if (getProperty(UseManagedIdentityCredentials.getName(), use_managed_identity_credentials)) {
+    credentials_.setUseManagedIdentityCredentials(use_managed_identity_credentials);
+  }
 }
 
 REGISTER_RESOURCE(AzureStorageCredentialsService, "Azure Storage Credentials Management Service");
 
-}  // namespace controllers
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::controllers
diff --git a/extensions/azure/controllerservices/AzureStorageCredentialsService.h b/extensions/azure/controllerservices/AzureStorageCredentialsService.h
index 8365c86..04887ae 100644
--- a/extensions/azure/controllerservices/AzureStorageCredentialsService.h
+++ b/extensions/azure/controllerservices/AzureStorageCredentialsService.h
@@ -26,12 +26,7 @@
 #include "storage/AzureStorageCredentials.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace controllers {
+namespace org::apache::nifi::minifi::azure::controllers {
 
 class AzureStorageCredentialsService : public core::controller::ControllerService {
  public:
@@ -40,6 +35,7 @@ class AzureStorageCredentialsService : public core::controller::ControllerServic
   EXTENSIONAPI static const core::Property SASToken;
   EXTENSIONAPI static const core::Property CommonStorageAccountEndpointSuffix;
   EXTENSIONAPI static const core::Property ConnectionString;
+  EXTENSIONAPI static const core::Property UseManagedIdentityCredentials;
 
   explicit AzureStorageCredentialsService(const std::string& name, const minifi::utils::Identifier& uuid = {})
       : ControllerService(name, uuid),
@@ -66,8 +62,8 @@ class AzureStorageCredentialsService : public core::controller::ControllerServic
 
   void onEnable() override;
 
-  std::string getConnectionString() const {
-    return credentials_.getConnectionString();
+  storage::AzureStorageCredentials getCredentials() const {
+    return credentials_;
   }
 
  private:
@@ -75,9 +71,4 @@ class AzureStorageCredentialsService : public core::controller::ControllerServic
   std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace controllers
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::controllers
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.cpp b/extensions/azure/processors/AzureStorageProcessorBase.cpp
index bbbedba..63b230e 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureStorageProcessorBase.cpp
@@ -32,25 +32,26 @@ const core::Property AzureStorageProcessorBase::AzureStorageCredentialsService(
     ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.")
     ->build());
 
-std::string AzureStorageProcessorBase::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const {
+std::tuple<AzureStorageProcessorBase::GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> AzureStorageProcessorBase::getCredentialsFromControllerService(
+    const std::shared_ptr<core::ProcessContext> &context) const {
   std::string service_name;
   if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
-    return "";
+    return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_EMPTY, std::nullopt);
   }
 
   std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name);
   if (nullptr == service) {
     logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name);
-    return "";
+    return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt);
   }
 
   auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service);
   if (!azure_credentials_service) {
     logger_->log_error("Controller service with name: '%s' is not an Azure Storage credentials service", service_name);
-    return "";
+    return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt);
   }
 
-  return azure_credentials_service->getConnectionString();
+  return std::make_tuple(GetCredentialsFromControllerResult::OK, azure_credentials_service->getCredentials());
 }
 
 }  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h
index b2e2655..880f822 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureStorageProcessorBase.h
@@ -22,10 +22,13 @@
 
 #include <memory>
 #include <string>
+#include <optional>
+#include <tuple>
 
 #include "core/Property.h"
 #include "core/Processor.h"
 #include "core/logging/Logger.h"
+#include "storage/AzureStorageCredentials.h"
 
 namespace org::apache::nifi::minifi::azure::processors {
 
@@ -39,10 +42,14 @@ class AzureStorageProcessorBase : public core::Processor {
       logger_(logger) {
   }
 
-  ~AzureStorageProcessorBase() override = default;
-
  protected:
-  std::string getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
+  enum class GetCredentialsFromControllerResult {
+    OK,
+    CONTROLLER_NAME_EMPTY,
+    CONTROLLER_NAME_INVALID
+  };
+
+  std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
 
   std::mutex azure_storage_mutex_;
   std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 243105b..194b76c 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -48,7 +48,7 @@ const core::Property PutAzureBlobStorage::StorageAccountKey(
       ->build());
 const core::Property PutAzureBlobStorage::SASToken(
     core::PropertyBuilder::createProperty("SAS Token")
-      ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.")
+      ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.")
       ->supportsExpressionLanguage(true)
       ->build());
 const core::Property PutAzureBlobStorage::CommonStorageAccountEndpointSuffix(
@@ -59,14 +59,13 @@ const core::Property PutAzureBlobStorage::CommonStorageAccountEndpointSuffix(
       ->build());
 const core::Property PutAzureBlobStorage::ConnectionString(
   core::PropertyBuilder::createProperty("Connection String")
-    ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties.")
+    ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.")
     ->supportsExpressionLanguage(true)
     ->build());
 const core::Property PutAzureBlobStorage::Blob(
   core::PropertyBuilder::createProperty("Blob")
-    ->withDescription("The filename of the blob.")
+    ->withDescription("The filename of the blob. If left empty the filename attribute will be used by default.")
     ->supportsExpressionLanguage(true)
-    ->isRequired(true)
     ->build());
 const core::Property PutAzureBlobStorage::CreateContainer(
   core::PropertyBuilder::createProperty("Create Container")
@@ -76,6 +75,12 @@ const core::Property PutAzureBlobStorage::CreateContainer(
     ->isRequired(true)
     ->withDefaultValue<bool>(false)
     ->build());
+const core::Property PutAzureBlobStorage::UseManagedIdentityCredentials(
+  core::PropertyBuilder::createProperty("Use Managed Identity Credentials")
+    ->withDescription("If true Managed Identity credentials will be used together with the Storage Account Name for authentication.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
 
 const core::Relationship PutAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship");
 const core::Relationship PutAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship");
@@ -91,7 +96,8 @@ void PutAzureBlobStorage::initialize() {
     CommonStorageAccountEndpointSuffix,
     ConnectionString,
     Blob,
-    CreateContainer
+    CreateContainer,
+    UseManagedIdentityCredentials
   });
   // Set the supported relationships
   setSupportedRelationships({
@@ -108,15 +114,20 @@ void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext>
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Container Name property missing or invalid");
   }
 
-  if (!context->getProperty(Blob.getName(), value) || value.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Blob property missing or invalid");
-  }
-
   if (context->getProperty(AzureStorageCredentialsService.getName(), value) && !value.empty()) {
     logger_->log_info("Getting Azure Storage credentials from controller service with name: '%s'", value);
     return;
   }
 
+  if (!context->getProperty(UseManagedIdentityCredentials.getName(), use_managed_identity_credentials_)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Use Managed Identity Credentials is invalid.");
+  }
+
+  if (use_managed_identity_credentials_) {
+    logger_->log_info("Using Managed Identity for authentication");
+    return;
+  }
+
   if (context->getProperty(ConnectionString.getName(), value) && !value.empty()) {
     logger_->log_info("Using connectionstring directly for Azure Storage authentication");
     return;
@@ -138,38 +149,82 @@ void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext>
   logger_->log_info("Using storage account name and SAS token for authentication");
 }
 
-std::string PutAzureBlobStorage::getAzureConnectionStringFromProperties(
+storage::AzureStorageCredentials PutAzureBlobStorage::getAzureCredentialsFromProperties(
     const std::shared_ptr<core::ProcessContext> &context,
-    const std::shared_ptr<core::FlowFile> &flow_file) {
+    const std::shared_ptr<core::FlowFile> &flow_file) const {
   storage::AzureStorageCredentials credentials;
-  context->getProperty(StorageAccountName, credentials.storage_account_name, flow_file);
-  context->getProperty(StorageAccountKey, credentials.storage_account_key, flow_file);
-  context->getProperty(SASToken, credentials.sas_token, flow_file);
-  context->getProperty(CommonStorageAccountEndpointSuffix, credentials.endpoint_suffix, flow_file);
-  context->getProperty(ConnectionString, credentials.connection_string, flow_file);
-  return credentials.getConnectionString();
+  std::string value;
+  if (context->getProperty(StorageAccountName, value, flow_file)) {
+    credentials.setStorageAccountName(value);
+  }
+  if (context->getProperty(StorageAccountKey, value, flow_file)) {
+    credentials.setStorageAccountKey(value);
+  }
+  if (context->getProperty(SASToken, value, flow_file)) {
+    credentials.setSasToken(value);
+  }
+  if (context->getProperty(CommonStorageAccountEndpointSuffix, value, flow_file)) {
+    credentials.setEndpontSuffix(value);
+  }
+  if (context->getProperty(ConnectionString, value, flow_file)) {
+    credentials.setConnectionString(value);
+  }
+  credentials.setUseManagedIdentityCredentials(use_managed_identity_credentials_);
+  return credentials;
 }
 
-void PutAzureBlobStorage::createAzureStorageClient(const std::string &connection_string, const std::string &container_name) {
-  // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the
-  // client is not reset with different configuration while another thread is using it.
-  if (blob_storage_wrapper_ == nullptr) {
-    blob_storage_wrapper_ = std::make_unique<storage::AzureBlobStorage>(connection_string, container_name);
-    return;
+std::optional<storage::PutAzureBlobStorageParameters> PutAzureBlobStorage::buildAzureBlobStorageParameters(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file) {
+  storage::PutAzureBlobStorageParameters params;
+  auto credentials = getCredentials(context, flow_file);
+  if (!credentials) {
+    return std::nullopt;
+  }
+
+  params.credentials = *credentials;
+
+  if (!context->getProperty(ContainerName, params.container_name, flow_file) || params.container_name.empty()) {
+    logger_->log_error("Container Name is invalid or empty!");
+    return std::nullopt;
   }
 
-  blob_storage_wrapper_->resetClientIfNeeded(connection_string, container_name);
+  context->getProperty(Blob, params.blob_name, flow_file);
+  if (params.blob_name.empty() && (!flow_file->getAttribute("filename", params.blob_name) || params.blob_name.empty())) {
+    logger_->log_error("Blob is not set and default 'filename' attribute could not be found!");
+    return std::nullopt;
+  }
+
+  return params;
 }
 
-std::string PutAzureBlobStorage::getConnectionString(
+std::optional<storage::AzureStorageCredentials> PutAzureBlobStorage::getCredentials(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) const {
-  auto connection_string = getAzureConnectionStringFromProperties(context, flow_file);
-  if (!connection_string.empty()) {
-    return connection_string;
+  auto [result, controller_service_creds] = getCredentialsFromControllerService(context);
+  if (controller_service_creds) {
+    if (controller_service_creds->isValid()) {
+      logger_->log_debug("Azure credentials read from credentials controller service!");
+      return controller_service_creds;
+    } else {
+      logger_->log_error("Azure credentials controller service is set with invalid credential parameters!");
+      return std::nullopt;
+    }
+  } else if (result == GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID) {
+    logger_->log_error("Azure credentials controller service name is invalid!");
+    return std::nullopt;
+  }
+
+  logger_->log_debug("No valid Azure credentials are set in credentials controller service, checking properties...");
+
+  auto property_creds = getAzureCredentialsFromProperties(context, flow_file);
+  if (property_creds.isValid()) {
+    logger_->log_debug("Azure credentials read from properties!");
+    return property_creds;
   }
 
-  return getConnectionStringFromControllerService(context);
+  logger_->log_error("No valid Azure credentials are set in credentials controller service nor in properties!");
+  return std::nullopt;
 }
 
 void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -179,23 +234,8 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext>
     return;
   }
 
-  auto connection_string = getConnectionString(context, flow_file);
-  if (connection_string.empty()) {
-    logger_->log_error("Connection string is empty!");
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
-  std::string container_name;
-  if (!context->getProperty(ContainerName, container_name, flow_file) || container_name.empty()) {
-    logger_->log_error("Container Name is invalid or empty!");
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
-  std::string blob_name;
-  if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) {
-    logger_->log_error("Blob name is invalid or empty!");
+  auto params = buildAzureBlobStorageParameters(context, flow_file);
+  if (!params) {
     session->transfer(flow_file, Failure);
     return;
   }
@@ -203,29 +243,34 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext>
   std::optional<storage::UploadBlobResult> upload_result;
   {
     // TODO(lordgamez): This can be removed after maximum allowed threads are implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
+    // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the
+    // client is not reset with different configuration while another thread is using it.
     std::lock_guard<std::mutex> lock(azure_storage_mutex_);
-    createAzureStorageClient(connection_string, container_name);
     if (create_container_) {
-      blob_storage_wrapper_->createContainer();
+      auto result = azure_blob_storage_.createContainerIfNotExists(*params);
+      if (!result) {
+        session->transfer(flow_file, Failure);
+        return;
+      }
     }
-    PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), *blob_storage_wrapper_, blob_name);
+    PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), azure_blob_storage_, *params);
     session->read(flow_file, &callback);
     upload_result = callback.getResult();
   }
 
   if (!upload_result) {
-    logger_->log_error("Failed to upload blob '%s' to Azure Storage container '%s'", blob_name, container_name);
+    logger_->log_error("Failed to upload blob '%s' to Azure Storage container '%s'", params->blob_name, params->container_name);
     session->transfer(flow_file, Failure);
     return;
   }
 
-  session->putAttribute(flow_file, "azure.container", container_name);
-  session->putAttribute(flow_file, "azure.blobname", blob_name);
+  session->putAttribute(flow_file, "azure.container", params->container_name);
+  session->putAttribute(flow_file, "azure.blobname", params->blob_name);
   session->putAttribute(flow_file, "azure.primaryUri", upload_result->primary_uri);
   session->putAttribute(flow_file, "azure.etag", upload_result->etag);
-  session->putAttribute(flow_file, "azure.length", std::to_string(upload_result->length));
+  session->putAttribute(flow_file, "azure.length", std::to_string(flow_file->getSize()));
   session->putAttribute(flow_file, "azure.timestamp", upload_result->timestamp);
-  logger_->log_debug("Successfully uploaded blob '%s' to Azure Storage container '%s'", blob_name, container_name);
+  logger_->log_debug("Successfully uploaded blob '%s' to Azure Storage container '%s'", params->blob_name, params->container_name);
   session->transfer(flow_file, Success);
 }
 
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h
index f779cff..bfd71fd 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -29,8 +29,9 @@
 #include "core/Property.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "storage/BlobStorage.h"
+#include "storage/AzureBlobStorage.h"
 #include "AzureStorageProcessorBase.h"
+#include "storage/AzureStorageCredentials.h"
 
 class PutAzureBlobStorageTestsFixture;
 
@@ -47,6 +48,7 @@ class PutAzureBlobStorage final : public AzureStorageProcessorBase {
   static const core::Property ConnectionString;
   static const core::Property Blob;
   static const core::Property CreateContainer;
+  static const core::Property UseManagedIdentityCredentials;
 
   // Supported Relationships
   static const core::Relationship Failure;
@@ -56,18 +58,16 @@ class PutAzureBlobStorage final : public AzureStorageProcessorBase {
     : PutAzureBlobStorage(name, uuid, nullptr) {
   }
 
-  ~PutAzureBlobStorage() override = default;
-
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t flow_size, storage::BlobStorage& blob_storage_wrapper, const std::string &blob_name)
+    ReadCallback(uint64_t flow_size, storage::AzureBlobStorage& azure_blob_storage, const storage::PutAzureBlobStorageParameters& params)
       : flow_size_(flow_size)
-      , blob_storage_wrapper_(blob_storage_wrapper)
-      , blob_name_(blob_name) {
+      , azure_blob_storage_(azure_blob_storage)
+      , params_(params) {
     }
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
@@ -77,11 +77,8 @@ class PutAzureBlobStorage final : public AzureStorageProcessorBase {
         return -1;
       }
 
-      result_ = blob_storage_wrapper_.uploadBlob(blob_name_, buffer.data(), flow_size_);
-      if (!result_) {
-        return read_ret;
-      }
-      return result_->length;
+      result_ = azure_blob_storage_.uploadBlob(params_, gsl::make_span(buffer.data(), flow_size_));
+      return read_ret;
     }
 
     std::optional<storage::UploadBlobResult> getResult() const {
@@ -90,29 +87,32 @@ class PutAzureBlobStorage final : public AzureStorageProcessorBase {
 
    private:
     uint64_t flow_size_;
-    storage::BlobStorage &blob_storage_wrapper_;
-    std::string blob_name_;
+    storage::AzureBlobStorage &azure_blob_storage_;
+    const storage::PutAzureBlobStorageParameters& params_;
     std::optional<storage::UploadBlobResult> result_ = std::nullopt;
   };
 
  private:
   friend class ::PutAzureBlobStorageTestsFixture;
 
-  explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorage> blob_storage_wrapper)
+  explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
     : AzureStorageProcessorBase(name, uuid, logging::LoggerFactory<PutAzureBlobStorage>::getLogger())
-    , blob_storage_wrapper_(std::move(blob_storage_wrapper)) {
+    , azure_blob_storage_(std::move(blob_storage_client)) {
   }
 
-  static std::string getAzureConnectionStringFromProperties(
+  storage::AzureStorageCredentials getAzureCredentialsFromProperties(
     const std::shared_ptr<core::ProcessContext> &context,
-    const std::shared_ptr<core::FlowFile> &flow_file);
-  std::string getConnectionString(
+    const std::shared_ptr<core::FlowFile> &flow_file) const;
+  std::optional<storage::AzureStorageCredentials> getCredentials(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) const;
-  void createAzureStorageClient(const std::string &connection_string, const std::string &container_name);
+  std::optional<storage::PutAzureBlobStorageParameters> buildAzureBlobStorageParameters(
+    const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::FlowFile> &flow_file);
 
-  std::unique_ptr<storage::BlobStorage> blob_storage_wrapper_;
+  storage::AzureBlobStorage azure_blob_storage_;
   bool create_container_ = false;
+  bool use_managed_identity_credentials_ = false;
 };
 
 }  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index caff11b..c07c3b6 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -72,11 +72,18 @@ void PutAzureDataLakeStorage::initialize() {
 }
 
 void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  connection_string_ = getConnectionStringFromControllerService(context);
-  if (connection_string_.empty()) {
+  std::optional<storage::AzureStorageCredentials> credentials;
+  std::tie(std::ignore, credentials) = getCredentialsFromControllerService(context);
+  if (!credentials) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid");
   }
 
+  if (!credentials->isValid()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid");
+  }
+
+  credentials_ = *credentials;
+
   conflict_resolution_strategy_ = FileExistsResolutionStrategy::parse(
     utils::parsePropertyWithAllowableValuesOrThrow(*context, ConflictResolutionStrategy.getName(), FileExistsResolutionStrategy::values()).c_str());
 }
@@ -84,7 +91,7 @@ void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessCont
 std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters(
     const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
   storage::PutAzureDataLakeStorageParameters params;
-  params.connection_string = connection_string_;
+  params.credentials = credentials_;
   params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE;
 
   if (!context->getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) {
@@ -168,7 +175,7 @@ int64_t PutAzureDataLakeStorage::ReadCallback::process(const std::shared_ptr<io:
     return -1;
   }
 
-  result_ = azure_data_lake_storage_.uploadFile(params_, gsl::span<const uint8_t>{buffer.data(), flow_size_});
+  result_ = azure_data_lake_storage_.uploadFile(params_, gsl::make_span(buffer.data(), flow_size_));
   return read_ret;
 }
 
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index a8fb1f3..d842cdc 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -95,7 +95,7 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
 
   std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
 
-  std::string connection_string_;
+  storage::AzureStorageCredentials credentials_;
   FileExistsResolutionStrategy conflict_resolution_strategy_;
   storage::AzureDataLakeStorage azure_data_lake_storage_;
 };
diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp
index 5d3fb46..b82b801 100644
--- a/extensions/azure/storage/AzureBlobStorage.cpp
+++ b/extensions/azure/storage/AzureBlobStorage.cpp
@@ -23,61 +23,39 @@
 #include <memory>
 #include <utility>
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace storage {
+#include "azure/identity.hpp"
+#include "AzureBlobStorageClient.h"
 
-AzureBlobStorage::AzureBlobStorage(std::string connection_string, std::string container_name)
-  : BlobStorage(std::move(connection_string), std::move(container_name))
-  , container_client_(std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
-      Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(connection_string_, container_name_))) {
-}
+namespace org::apache::nifi::minifi::azure::storage {
 
-void AzureBlobStorage::resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) {
-  if (connection_string == connection_string_ && container_name_ == container_name) {
-    logger_->log_debug("Client credentials have not changed, no need to reset client");
-    return;
-  }
-  connection_string_ = connection_string;
-  container_name_ = container_name;
-  logger_->log_debug("Client has been reset with new credentials");
-  container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(connection_string, container_name));
+AzureBlobStorage::AzureBlobStorage(std::unique_ptr<BlobStorageClient> blob_storage_client)
+  : blob_storage_client_(blob_storage_client ? std::move(blob_storage_client) : std::make_unique<AzureBlobStorageClient>()) {
 }
 
-void AzureBlobStorage::createContainer() {
+std::optional<bool> AzureBlobStorage::createContainerIfNotExists(const PutAzureBlobStorageParameters& params) {
   try {
-    auto blob_client = container_client_->Create();
-    logger_->log_debug("Container created");
-  } catch (const std::runtime_error&) {
-    logger_->log_debug("Container creation failed, it already exists.");
+    return blob_storage_client_->createContainerIfNotExists(params);
+  } catch (const std::exception& ex) {
+    logger_->log_error("An exception occurred while creating container: %s", ex.what());
+    return std::nullopt;
   }
 }
 
-std::optional<UploadBlobResult> AzureBlobStorage::uploadBlob(const std::string &blob_name, const uint8_t* buffer, std::size_t buffer_size) {
+std::optional<UploadBlobResult> AzureBlobStorage::uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer) {
   try {
-    auto blob_client = container_client_->GetBlockBlobClient(blob_name);
-    auto response = blob_client.UploadFrom(buffer, buffer_size);
+    auto response = blob_storage_client_->uploadBlob(params, buffer);
 
     UploadBlobResult result;
-    result.length = buffer_size;
-    result.primary_uri = container_client_->GetUrl();
-    if (response.Value.ETag.HasValue()) {
-      result.etag = response.Value.ETag.ToString();
+    result.primary_uri = blob_storage_client_->getUrl(params);
+    if (response.ETag.HasValue()) {
+      result.etag = response.ETag.ToString();
     }
-    result.timestamp = response.Value.LastModified.ToString(Azure::DateTime::DateFormat::Rfc1123);
+    result.timestamp = response.LastModified.ToString(Azure::DateTime::DateFormat::Rfc1123);
     return result;
-  } catch (const std::runtime_error& err) {
-    logger_->log_error("A runtime error occurred while uploading blob: %s", err.what());
+  } catch (const std::exception& ex) {
+    logger_->log_error("An exception occurred while uploading blob: %s", ex.what());
     return std::nullopt;
   }
 }
 
-}  // namespace storage
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureBlobStorage.h b/extensions/azure/storage/AzureBlobStorage.h
index 90c7c44..74d7ab5 100644
--- a/extensions/azure/storage/AzureBlobStorage.h
+++ b/extensions/azure/storage/AzureBlobStorage.h
@@ -24,33 +24,29 @@
 #include <string>
 #include <vector>
 
-#include "BlobStorage.h"
+#include "BlobStorageClient.h"
 #include "azure/storage/blobs.hpp"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace storage {
+namespace org::apache::nifi::minifi::azure::storage {
 
-class AzureBlobStorage : public BlobStorage {
+struct UploadBlobResult {
+  std::string primary_uri;
+  std::string etag;
+  std::string timestamp;
+};
+
+class AzureBlobStorage {
  public:
-  AzureBlobStorage(std::string connection_string, std::string container_name);
-  void createContainer() override;
-  void resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) override;
-  std::optional<UploadBlobResult> uploadBlob(const std::string &blob_name, const uint8_t* buffer, std::size_t buffer_size) override;
+  explicit AzureBlobStorage(std::unique_ptr<BlobStorageClient> blob_storage_client = nullptr);
+  std::optional<bool> createContainerIfNotExists(const PutAzureBlobStorageParameters& params);
+  std::optional<UploadBlobResult> uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer);
 
  private:
-  std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> container_client_;
   std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<AzureBlobStorage>::getLogger()};
+  gsl::not_null<std::unique_ptr<BlobStorageClient>> blob_storage_client_;
 };
 
-}  // namespace storage
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureBlobStorageClient.cpp b/extensions/azure/storage/AzureBlobStorageClient.cpp
new file mode 100644
index 0000000..f70b07f
--- /dev/null
+++ b/extensions/azure/storage/AzureBlobStorageClient.cpp
@@ -0,0 +1,65 @@
+/**
+ * @file AzureBlobStorageClient.cpp
+ * AzureBlobStorageClient class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureBlobStorageClient.h"
+
+#include "azure/identity.hpp"
+
+namespace org::apache::nifi::minifi::azure::storage {
+
+void AzureBlobStorageClient::resetClientIfNeeded(const AzureStorageCredentials &credentials, const std::string &container_name) {
+  if (container_client_ && credentials == credentials_ && container_name == container_name_) {
+    logger_->log_debug("Azure Blob Storage client credentials have not changed, no need to reset client");
+    return;
+  }
+
+  if (credentials.getUseManagedIdentityCredentials()) {
+    auto storage_client = Azure::Storage::Blobs::BlobServiceClient(
+      "https://" + credentials.getStorageAccountName() + ".blob." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
+
+    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
+    logger_->log_debug("Azure Blob Storage client has been reset with new managed identity credentials.");
+  } else {
+    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
+      Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(credentials.buildConnectionString(), container_name));
+    logger_->log_debug("Azure Blob Storage client has been reset with new connection string credentials.");
+  }
+
+  credentials_ = credentials;
+  container_name_ = container_name;
+}
+
+bool AzureBlobStorageClient::createContainerIfNotExists(const PutAzureBlobStorageParameters& params) {
+  resetClientIfNeeded(params.credentials, params.container_name);
+  return container_client_->CreateIfNotExists().Value.Created;
+}
+
+Azure::Storage::Blobs::Models::UploadBlockBlobResult AzureBlobStorageClient::uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer) {
+  resetClientIfNeeded(params.credentials, params.container_name);
+  auto blob_client = container_client_->GetBlockBlobClient(params.blob_name);
+  return blob_client.UploadFrom(buffer.data(), buffer.size()).Value;
+}
+
+std::string AzureBlobStorageClient::getUrl(const PutAzureBlobStorageParameters& params) {
+  resetClientIfNeeded(params.credentials, params.container_name);
+  return container_client_->GetUrl();
+}
+
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureBlobStorage.h b/extensions/azure/storage/AzureBlobStorageClient.h
similarity index 59%
copy from extensions/azure/storage/AzureBlobStorage.h
copy to extensions/azure/storage/AzureBlobStorageClient.h
index 90c7c44..7e6c261 100644
--- a/extensions/azure/storage/AzureBlobStorage.h
+++ b/extensions/azure/storage/AzureBlobStorageClient.h
@@ -1,6 +1,6 @@
 /**
- * @file AzureBlobStorage.h
- * AzureBlobStorage class declaration
+ * @file AzureBlobStorageClient.h
+ * AzureBlobStorageClient class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -24,33 +24,26 @@
 #include <string>
 #include <vector>
 
-#include "BlobStorage.h"
+#include "BlobStorageClient.h"
 #include "azure/storage/blobs.hpp"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace storage {
+namespace org::apache::nifi::minifi::azure::storage {
 
-class AzureBlobStorage : public BlobStorage {
+class AzureBlobStorageClient : public BlobStorageClient {
  public:
-  AzureBlobStorage(std::string connection_string, std::string container_name);
-  void createContainer() override;
-  void resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) override;
-  std::optional<UploadBlobResult> uploadBlob(const std::string &blob_name, const uint8_t* buffer, std::size_t buffer_size) override;
+  bool createContainerIfNotExists(const PutAzureBlobStorageParameters& params) override;
+  Azure::Storage::Blobs::Models::UploadBlockBlobResult uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer) override;
+  std::string getUrl(const PutAzureBlobStorageParameters& params) override;
 
  private:
+  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string &container_name);
+
+  AzureStorageCredentials credentials_;
+  std::string container_name_;
   std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> container_client_;
-  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<AzureBlobStorage>::getLogger()};
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<AzureBlobStorageClient>::getLogger()};
 };
 
-}  // namespace storage
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index 8c9f2a2..a974e5b 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -20,19 +20,35 @@
 
 #include "AzureDataLakeStorageClient.h"
 
+#include "azure/identity.hpp"
+
 namespace org::apache::nifi::minifi::azure::storage {
 
-void AzureDataLakeStorageClient::resetClientIfNeeded(const std::string& connection_string, const std::string& file_system_name) {
-  if (client_ == nullptr || connection_string_ != connection_string || file_system_name_ != file_system_name) {
+void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name) {
+  if (client_ && credentials_ == credentials && file_system_name_ == file_system_name) {
+    logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
+    return;
+  }
+
+  if (credentials.getUseManagedIdentityCredentials()) {
+    auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
+      "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
+
+    client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
+    logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
+  } else {
     client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
-      Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(connection_string, file_system_name));
-    file_system_name_ = file_system_name;
-    connection_string_ = connection_string;
+    Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name));
+    logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
   }
+
+  file_system_name_ = file_system_name;
+  credentials_ = credentials;
 }
 
 Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const PutAzureDataLakeStorageParameters& params) {
-  resetClientIfNeeded(params.connection_string, params.file_system_name);
+  resetClientIfNeeded(params.credentials, params.file_system_name);
+
   auto directory_client = client_->GetDirectoryClient(params.directory_name);
   if (!params.directory_name.empty()) {
     directory_client.CreateIfNotExists();
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 02b9362..4f398a7 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -48,12 +48,13 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
   std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override;
 
  private:
-  void resetClientIfNeeded(const std::string& connection_string, const std::string& file_system_name);
+  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name);
   Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const PutAzureDataLakeStorageParameters& params);
 
-  std::string connection_string_;
+  AzureStorageCredentials credentials_;
   std::string file_system_name_;
   std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
 };
 
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureStorageCredentials.cpp b/extensions/azure/storage/AzureStorageCredentials.cpp
new file mode 100644
index 0000000..070dfe8
--- /dev/null
+++ b/extensions/azure/storage/AzureStorageCredentials.cpp
@@ -0,0 +1,109 @@
+/**
+ * @file AzureStorageCredentials.cpp
+ * AzureStorageCredentials class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureStorageCredentials.h"
+
+namespace org::apache::nifi::minifi::azure::storage {
+
+void AzureStorageCredentials::setStorageAccountName(const std::string& storage_account_name) {
+  storage_account_name_ = storage_account_name;
+}
+
+void AzureStorageCredentials::setStorageAccountKey(const std::string& storage_account_key) {
+  storage_account_key_ = storage_account_key;
+}
+
+void AzureStorageCredentials::setSasToken(const std::string& sas_token) {
+  sas_token_ = sas_token;
+}
+
+void AzureStorageCredentials::setEndpontSuffix(const std::string& endpoint_suffix) {
+  endpoint_suffix_ = endpoint_suffix;
+}
+
+void AzureStorageCredentials::setConnectionString(const std::string& connection_string) {
+  connection_string_ = connection_string;
+}
+
+void AzureStorageCredentials::setUseManagedIdentityCredentials(bool use_managed_identity_credentials) {
+  use_managed_identity_credentials_ = use_managed_identity_credentials;
+}
+
+std::string AzureStorageCredentials::getStorageAccountName() const {
+  return storage_account_name_;
+}
+
+std::string AzureStorageCredentials::getEndpointSuffix() const {
+  return endpoint_suffix_.empty() ? "core.windows.net" : endpoint_suffix_;
+}
+
+bool AzureStorageCredentials::getUseManagedIdentityCredentials() const {
+  return use_managed_identity_credentials_;
+}
+
+std::string AzureStorageCredentials::buildConnectionString() const {
+  if (use_managed_identity_credentials_) {
+    return "";
+  }
+
+  if (!connection_string_.empty()) {
+    return connection_string_;
+  }
+
+  if (storage_account_name_.empty() || (storage_account_key_.empty() && sas_token_.empty())) {
+    return "";
+  }
+
+  std::string credentials;
+  credentials += "AccountName=" + storage_account_name_;
+
+  if (!storage_account_key_.empty()) {
+    credentials += ";AccountKey=" + storage_account_key_;
+  }
+
+  if (!sas_token_.empty()) {
+    credentials += ";SharedAccessSignature=" + (sas_token_[0] == '?' ? sas_token_.substr(1) : sas_token_);
+  }
+
+  if (!endpoint_suffix_.empty()) {
+    credentials += ";EndpointSuffix=" + endpoint_suffix_;
+  }
+
+  return credentials;
+}
+
+bool AzureStorageCredentials::isValid() const {
+  return (getUseManagedIdentityCredentials() && !getStorageAccountName().empty()) ||
+         (!getUseManagedIdentityCredentials() && !buildConnectionString().empty());
+}
+
+bool AzureStorageCredentials::operator==(const AzureStorageCredentials& other) const {
+  if (other.use_managed_identity_credentials_ != use_managed_identity_credentials_) {
+    return false;
+  }
+
+  if (use_managed_identity_credentials_) {
+    return storage_account_name_ == other.storage_account_name_ && endpoint_suffix_ == other.endpoint_suffix_;
+  } else {
+    return buildConnectionString() == other.buildConnectionString();
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureStorageCredentials.h b/extensions/azure/storage/AzureStorageCredentials.h
index b6b4b8d..ea14d2f 100644
--- a/extensions/azure/storage/AzureStorageCredentials.h
+++ b/extensions/azure/storage/AzureStorageCredentials.h
@@ -21,53 +21,32 @@
 
 #include <string>
 
-#include "utils/StringUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace storage {
-
-struct AzureStorageCredentials {
-  std::string storage_account_name;
-  std::string storage_account_key;
-  std::string sas_token;
-  std::string endpoint_suffix;
-  std::string connection_string;
-
-  std::string getConnectionString() const {
-    if (!connection_string.empty()) {
-      return connection_string;
-    }
-
-    if (storage_account_name.empty() || (storage_account_key.empty() && sas_token.empty())) {
-      return "";
-    }
-
-    std::string credentials;
-    credentials += "AccountName=" + storage_account_name;
-
-    if (!storage_account_key.empty()) {
-      credentials += ";AccountKey=" + storage_account_key;
-    }
-
-    if (!sas_token.empty()) {
-      credentials += ";SharedAccessSignature=" + (sas_token[0] == '?' ? sas_token.substr(1) : sas_token);
-    }
-
-    if (!endpoint_suffix.empty()) {
-      credentials += ";EndpointSuffix=" + endpoint_suffix;
-    }
-
-    return credentials;
-  }
+namespace org::apache::nifi::minifi::azure::storage {
+
+class AzureStorageCredentials {
+ public:
+  void setStorageAccountName(const std::string& storage_account_name);
+  void setStorageAccountKey(const std::string& storage_account_key);
+  void setSasToken(const std::string& sas_token);
+  void setEndpontSuffix(const std::string& endpoint_suffix);
+  void setConnectionString(const std::string& connection_string);
+  void setUseManagedIdentityCredentials(bool use_managed_identity_credentials);
+
+  std::string getStorageAccountName() const;
+  std::string getEndpointSuffix() const;
+  bool getUseManagedIdentityCredentials() const;
+  std::string buildConnectionString() const;
+  bool isValid() const;
+
+  bool operator==(const AzureStorageCredentials& other) const;
+
+ private:
+  std::string storage_account_name_;
+  std::string storage_account_key_;
+  std::string sas_token_;
+  std::string endpoint_suffix_;
+  std::string connection_string_;
+  bool use_managed_identity_credentials_ = false;
 };
 
-}  // namespace storage
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/BlobStorage.h b/extensions/azure/storage/BlobStorage.h
deleted file mode 100644
index f232ae3..0000000
--- a/extensions/azure/storage/BlobStorage.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * @file BlobStorage.h
- * BlobStorage class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <optional>
-#include <string>
-#include <utility>
-#include <vector>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace azure {
-namespace storage {
-
-struct UploadBlobResult {
-  std::string primary_uri;
-  std::string etag;
-  std::size_t length;
-  std::string timestamp;
-};
-
-class BlobStorage {
- public:
-  BlobStorage(std::string connection_string, std::string container_name)
-    : connection_string_(std::move(connection_string))
-    , container_name_(std::move(container_name)) {
-  }
-
-  virtual void createContainer() = 0;
-  virtual void resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) = 0;
-  virtual std::optional<UploadBlobResult> uploadBlob(const std::string &blob_name, const uint8_t* buffer, std::size_t buffer_size) = 0;
-  virtual ~BlobStorage() = default;
-
- protected:
-  std::string connection_string_;
-  std::string container_name_;
-};
-
-}  // namespace storage
-}  // namespace azure
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/BlobStorageClient.h
similarity index 55%
copy from extensions/azure/storage/DataLakeStorageClient.h
copy to extensions/azure/storage/BlobStorageClient.h
index f564426..298d80c 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/BlobStorageClient.h
@@ -1,6 +1,6 @@
 /**
- * @file DataLakeStorageClient.h
- * DataLakeStorageClient class declaration
+ * @file BlobStorageClient.h
+ * BlobStorageClient class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,25 +19,29 @@
  */
 #pragma once
 
+#include <optional>
 #include <string>
+#include <utility>
+#include <vector>
 
-#include "gsl/gsl-lite.hpp"
+#include "azure/storage/blobs/protocol/blob_rest_client.hpp"
+#include "AzureStorageCredentials.h"
+#include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
-struct PutAzureDataLakeStorageParameters {
-  std::string connection_string;
-  std::string file_system_name;
-  std::string directory_name;
-  std::string filename;
-  bool replace_file = false;
+struct PutAzureBlobStorageParameters {
+  AzureStorageCredentials credentials;
+  std::string container_name;
+  std::string blob_name;
 };
 
-class DataLakeStorageClient {
+class BlobStorageClient {
  public:
-  virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
-  virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
-  virtual ~DataLakeStorageClient() {}
+  virtual bool createContainerIfNotExists(const PutAzureBlobStorageParameters& params) = 0;
+  virtual Azure::Storage::Blobs::Models::UploadBlockBlobResult uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
+  virtual std::string getUrl(const PutAzureBlobStorageParameters& params) = 0;
+  virtual ~BlobStorageClient() = default;
 };
 
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index f564426..b0f470d 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -20,13 +20,16 @@
 #pragma once
 
 #include <string>
+#include <optional>
 
-#include "gsl/gsl-lite.hpp"
+#include "AzureStorageCredentials.h"
+
+#include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
 struct PutAzureDataLakeStorageParameters {
-  std::string connection_string;
+  AzureStorageCredentials credentials;
   std::string file_system_name;
   std::string directory_name;
   std::string filename;
@@ -37,7 +40,7 @@ class DataLakeStorageClient {
  public:
   virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
   virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
-  virtual ~DataLakeStorageClient() {}
+  virtual ~DataLakeStorageClient() = default;
 };
 
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/pdh/tests/PerformanceDataCounterTests.cpp b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
index 621e7e5..07b1c55 100644
--- a/extensions/pdh/tests/PerformanceDataCounterTests.cpp
+++ b/extensions/pdh/tests/PerformanceDataCounterTests.cpp
@@ -19,7 +19,7 @@
 #include "TestBase.h"
 #include "PDHCounters.h"
 #include "MemoryConsumptionCounter.h"
-#include "gsl/gsl-lite.hpp"
+#include "utils/gsl.h"
 
 using org::apache::nifi::minifi::processors::SinglePDHCounter;
 using org::apache::nifi::minifi::processors::PDHCounterArray;
diff --git a/extensions/windows-event-log/wel/JSONUtils.cpp b/extensions/windows-event-log/wel/JSONUtils.cpp
index ff64117..da108d6 100644
--- a/extensions/windows-event-log/wel/JSONUtils.cpp
+++ b/extensions/windows-event-log/wel/JSONUtils.cpp
@@ -28,7 +28,7 @@
 #include "rapidjson/stringbuffer.h"
 #include "rapidjson/prettywriter.h"
 
-#include "gsl/gsl-lite.hpp";
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
index b315d41..5e6d2de 100644
--- a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
@@ -27,7 +27,7 @@
 #include "processors/PutFile.h"
 #include "processors/LogAttribute.h"
 #include "processors/UpdateAttribute.h"
-#include "storage/BlobStorage.h"
+#include "storage/BlobStorageClient.h"
 #include "utils/file/FileUtils.h"
 
 const std::string CONTAINER_NAME = "test-container";
@@ -38,46 +38,41 @@ const std::string ENDPOINT_SUFFIX = "test.suffix.com";
 const std::string CONNECTION_STRING = "test-connectionstring";
 const std::string BLOB_NAME = "test-blob.txt";
 const std::string TEST_DATA = "data";
+const std::string GET_FILE_NAME = "input_data.log";
 
-class MockBlobStorage : public minifi::azure::storage::BlobStorage {
+class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
  public:
   const std::string ETAG = "test-etag";
   const std::string PRIMARY_URI = "test-uri";
-  const std::string TEST_TIMESTAMP = "test-timestamp";
+  const std::string TEST_TIMESTAMP = "Sun, 21 Oct 2018 12:16:24 GMT";
 
-  MockBlobStorage()
-    : BlobStorage("", "") {
-  }
-
-  void createContainer() override {
+  bool createContainerIfNotExists(const minifi::azure::storage::PutAzureBlobStorageParameters& params) override {
+    params_ = params;
     container_created_ = true;
+    return true;
   }
 
-  void resetClientIfNeeded(const std::string &connection_string, const std::string &container_name) override {
-    connection_string_ = connection_string;
-    container_name_ = container_name;
-  }
-
-  std::optional<minifi::azure::storage::UploadBlobResult> uploadBlob(const std::string& /*blob_name*/, const uint8_t* buffer, std::size_t buffer_size) override {
+  Azure::Storage::Blobs::Models::UploadBlockBlobResult uploadBlob(const minifi::azure::storage::PutAzureBlobStorageParameters& params, gsl::span<const uint8_t> buffer) override {
+    params_ = params;
     if (upload_fails_) {
-      return std::nullopt;
+      throw std::runtime_error("error");
     }
 
-    input_data = std::string(buffer, buffer + buffer_size);
-    minifi::azure::storage::UploadBlobResult result;
-    result.etag = ETAG;
-    result.length = buffer_size;
-    result.primary_uri = PRIMARY_URI;
-    result.timestamp = TEST_TIMESTAMP;
+    input_data_ = std::string(buffer.begin(), buffer.end());
+
+    Azure::Storage::Blobs::Models::UploadBlockBlobResult result;
+    result.ETag = Azure::ETag{ETAG};
+    result.LastModified = Azure::DateTime::Parse(TEST_TIMESTAMP, Azure::DateTime::DateFormat::Rfc1123);
     return result;
   }
 
-  std::string getConnectionString() const {
-    return connection_string_;
+  std::string getUrl(const minifi::azure::storage::PutAzureBlobStorageParameters& params) override {
+    params_ = params;
+    return PRIMARY_URI;
   }
 
-  std::string getContainerName() const {
-    return container_name_;
+  minifi::azure::storage::PutAzureBlobStorageParameters getPassedParams() const {
+    return params_;
   }
 
   bool getContainerCreated() const {
@@ -88,11 +83,15 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorage {
     upload_fails_ = upload_fails;
   }
 
-  std::string input_data;
+  std::string getInputData() const {
+    return input_data_;
+  }
 
  private:
+  minifi::azure::storage::PutAzureBlobStorageParameters params_;
   bool container_created_ = false;
   bool upload_fails_ = false;
+  std::string input_data_;
 };
 
 class PutAzureBlobStorageTestsFixture {
@@ -114,7 +113,7 @@ class PutAzureBlobStorageTestsFixture {
     put_azure_blob_storage = std::shared_ptr<minifi::azure::processors::PutAzureBlobStorage>(
       new minifi::azure::processors::PutAzureBlobStorage("PutAzureBlobStorage", utils::Identifier(), std::move(mock_blob_storage)));
     auto input_dir = test_controller.createTempDirectory();
-    std::ofstream input_file_stream(input_dir + utils::file::FileUtils::get_separator() + "input_data.log");
+    std::ofstream input_file_stream(input_dir + utils::file::FileUtils::get_separator() + GET_FILE_NAME);
     input_file_stream << TEST_DATA;
     input_file_stream.close();
     get_file = plan->addProcessor("GetFile", "GetFile");
@@ -169,15 +168,7 @@ class PutAzureBlobStorageTestsFixture {
   std::string output_dir;
 };
 
-TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test required parameters", "[azureStorageParameters]") {
-  SECTION("Container name not set") {
-  }
-
-  SECTION("Blob name not set") {
-    plan->setProperty(update_attribute, "test.container", CONTAINER_NAME, true);
-    plan->setProperty(put_azure_blob_storage, "Container Name", "${test.container}");
-  }
-
+TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Container name not set", "[azureStorageParameters]") {
   REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
 }
 
@@ -202,7 +193,8 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
     plan->setProperty(put_azure_blob_storage, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
   }
 
   SECTION("Overriding credentials set in Azure Storage Credentials Service with connection string") {
@@ -212,7 +204,8 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(azure_storage_cred_service, "Connection String", CONNECTION_STRING);
     plan->setProperty(put_azure_blob_storage, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == CONNECTION_STRING);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
   }
 
   SECTION("Account name and key set in properties") {
@@ -221,7 +214,8 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(update_attribute, "test.account_key", STORAGE_ACCOUNT_KEY, true);
     plan->setProperty(put_azure_blob_storage, "Storage Account Key", "${test.account_key}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
   }
 
   SECTION("Account name and SAS token set in properties") {
@@ -230,7 +224,8 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(update_attribute, "test.sas_token", SAS_TOKEN, true);
     plan->setProperty(put_azure_blob_storage, "SAS Token", "${test.sas_token}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
   }
 
   SECTION("Account name and SAS token with question mark set in properties") {
@@ -239,7 +234,8 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(update_attribute, "test.sas_token", "?" + SAS_TOKEN, true);
     plan->setProperty(put_azure_blob_storage, "SAS Token", "${test.sas_token}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
   }
 
   SECTION("Endpoint suffix overriden") {
@@ -250,17 +246,67 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(update_attribute, "test.endpoint_suffix", ENDPOINT_SUFFIX, true);
     plan->setProperty(put_azure_blob_storage, "Common Storage Account Endpoint Suffix", "${test.endpoint_suffix}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY + ";EndpointSuffix=" + ENDPOINT_SUFFIX);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY + ";EndpointSuffix=" + ENDPOINT_SUFFIX);
   }
 
   SECTION("Use connection string") {
     plan->setProperty(update_attribute, "test.connection_string", CONNECTION_STRING, true);
     plan->setProperty(put_azure_blob_storage, "Connection String", "${test.connection_string}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == CONNECTION_STRING);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
   }
 
   SECTION("Overriding credentials with connection string") {
+    plan->setProperty(update_attribute, "test.account_name", STORAGE_ACCOUNT_NAME, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Name", "${test.account_name}");
+    plan->setProperty(update_attribute, "test.account_key", STORAGE_ACCOUNT_KEY, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Key", "${test.account_key}");
+    plan->setProperty(update_attribute, "test.connection_string", CONNECTION_STRING, true);
+    plan->setProperty(put_azure_blob_storage, "Connection String", "${test.connection_string}");
+    test_controller.runSession(plan, true);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  }
+
+  SECTION("Connection string is empty after substituting it from expression language") {
+    plan->setProperty(update_attribute, "test.connection_string", "", true);
+    plan->setProperty(put_azure_blob_storage, "Connection String", "${test.connection_string}");
+    test_controller.runSession(plan, true);
+    auto failed_flowfiles = getFailedFlowFileContents();
+    REQUIRE(failed_flowfiles.size() == 1);
+    REQUIRE(failed_flowfiles[0] == TEST_DATA);
+  }
+
+  SECTION("Account name and managed identity are used in properties") {
+    plan->setProperty(put_azure_blob_storage, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+    plan->setProperty(put_azure_blob_storage, "Use Managed Identity Credentials", "true");
+    test_controller.runSession(plan, true);
+    CHECK(getFailedFlowFileContents().size() == 0);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    CHECK(passed_params.credentials.buildConnectionString().empty());
+    CHECK(passed_params.credentials.getStorageAccountName() == STORAGE_ACCOUNT_NAME);
+    CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+    CHECK(passed_params.container_name == CONTAINER_NAME);
+  }
+
+  SECTION("Account name and managed identity are used from Azure Storage Credentials Service") {
+    auto azure_storage_cred_service = plan->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+    plan->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+    plan->setProperty(azure_storage_cred_service, "Use Managed Identity Credentials", "true");
+    plan->setProperty(azure_storage_cred_service, "Common Storage Account Endpoint Suffix", "core.chinacloudapi.cn");
+    plan->setProperty(put_azure_blob_storage, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+    test_controller.runSession(plan, true);
+    CHECK(getFailedFlowFileContents().size() == 0);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    CHECK(passed_params.credentials.buildConnectionString().empty());
+    CHECK(passed_params.credentials.getStorageAccountName() == STORAGE_ACCOUNT_NAME);
+    CHECK(passed_params.credentials.getEndpointSuffix() == "core.chinacloudapi.cn");
+    CHECK(passed_params.container_name == CONTAINER_NAME);
+  }
+
+  SECTION("Azure Storage Credentials Service overrides properties") {
     auto azure_storage_cred_service = plan->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
     plan->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
     plan->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
@@ -272,36 +318,69 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test credentials settings", "
     plan->setProperty(update_attribute, "test.connection_string", CONNECTION_STRING, true);
     plan->setProperty(put_azure_blob_storage, "Connection String", "${test.connection_string}");
     test_controller.runSession(plan, true);
-    REQUIRE(mock_blob_storage_ptr->getConnectionString() == CONNECTION_STRING);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
   }
 
-  SECTION("Connection string is empty after substituting it from expression language") {
-    plan->setProperty(update_attribute, "test.connection_string", "", true);
-    plan->setProperty(put_azure_blob_storage, "Connection String", "${test.connection_string}");
+  SECTION("Azure Storage Credentials Service is set with invalid parameters") {
+    auto azure_storage_cred_service = plan->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+    plan->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+    plan->setProperty(put_azure_blob_storage, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+    plan->setProperty(update_attribute, "test.account_name", STORAGE_ACCOUNT_NAME, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Name", "${test.account_name}");
+    plan->setProperty(update_attribute, "test.account_key", STORAGE_ACCOUNT_KEY, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Key", "${test.account_key}");
+    test_controller.runSession(plan, true);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString().empty());
+    auto failed_flowfiles = getFailedFlowFileContents();
+    REQUIRE(failed_flowfiles.size() == 1);
+    REQUIRE(failed_flowfiles[0] == TEST_DATA);
+  }
+
+  SECTION("Azure Storage Credentials Service name is invalid") {
+    auto azure_storage_cred_service = plan->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+    plan->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+    plan->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+    plan->setProperty(put_azure_blob_storage, "Azure Storage Credentials Service", "invalid_name");
+    plan->setProperty(update_attribute, "test.account_name", STORAGE_ACCOUNT_NAME, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Name", "${test.account_name}");
+    plan->setProperty(update_attribute, "test.account_key", STORAGE_ACCOUNT_KEY, true);
+    plan->setProperty(put_azure_blob_storage, "Storage Account Key", "${test.account_key}");
     test_controller.runSession(plan, true);
+    auto passed_params = mock_blob_storage_ptr->getPassedParams();
+    REQUIRE(passed_params.credentials.buildConnectionString().empty());
     auto failed_flowfiles = getFailedFlowFileContents();
     REQUIRE(failed_flowfiles.size() == 1);
     REQUIRE(failed_flowfiles[0] == TEST_DATA);
   }
 }
 
+TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test Azure blob upload failure in case Blob is not set and filename is empty", "[azureBlobStorageUpload]") {
+  plan->setProperty(update_attribute, "test.container", CONTAINER_NAME, true);
+  plan->setProperty(put_azure_blob_storage, "Container Name", "${test.container}");
+  plan->setProperty(update_attribute, "filename", "", true);
+  setDefaultCredentials();
+  test_controller.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Blob is not set and default 'filename' attribute could not be found!"));
+}
+
 TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test Azure blob upload", "[azureBlobStorageUpload]") {
   plan->setProperty(update_attribute, "test.container", CONTAINER_NAME, true);
   plan->setProperty(put_azure_blob_storage, "Container Name", "${test.container}");
-  plan->setProperty(update_attribute, "test.blob", BLOB_NAME, true);
-  plan->setProperty(put_azure_blob_storage, "Blob", "${test.blob}");
   setDefaultCredentials();
   test_controller.runSession(plan, true);
-  REQUIRE(LogTestController::getInstance().contains("key:azure.container value:" + CONTAINER_NAME));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.blobname value:" + BLOB_NAME));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.primaryUri value:" + mock_blob_storage_ptr->PRIMARY_URI));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.etag value:" + mock_blob_storage_ptr->ETAG));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.length value:" + std::to_string(TEST_DATA.size())));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.timestamp value:" + mock_blob_storage_ptr->TEST_TIMESTAMP));
-  REQUIRE(mock_blob_storage_ptr->input_data == TEST_DATA);
-  REQUIRE(mock_blob_storage_ptr->getContainerCreated() == false);
-  REQUIRE(mock_blob_storage_ptr->getContainerName() == CONTAINER_NAME);
-  REQUIRE(getFailedFlowFileContents().size() == 0);
+  CHECK(LogTestController::getInstance().contains("key:azure.container value:" + CONTAINER_NAME));
+  CHECK(LogTestController::getInstance().contains("key:azure.blobname value:" + GET_FILE_NAME));
+  CHECK(LogTestController::getInstance().contains("key:azure.primaryUri value:" + mock_blob_storage_ptr->PRIMARY_URI));
+  CHECK(LogTestController::getInstance().contains("key:azure.etag value:" + mock_blob_storage_ptr->ETAG));
+  CHECK(LogTestController::getInstance().contains("key:azure.length value:" + std::to_string(TEST_DATA.size())));
+  CHECK(LogTestController::getInstance().contains("key:azure.timestamp value:" + mock_blob_storage_ptr->TEST_TIMESTAMP));
+  CHECK(mock_blob_storage_ptr->getInputData() == TEST_DATA);
+  CHECK(mock_blob_storage_ptr->getContainerCreated() == false);
+  auto passed_params = mock_blob_storage_ptr->getPassedParams();
+  CHECK(passed_params.container_name == CONTAINER_NAME);
+  CHECK(getFailedFlowFileContents().size() == 0);
 }
 
 TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test Azure blob upload with container creation", "[azureBlobStorageUpload]") {
@@ -312,16 +391,17 @@ TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test Azure blob upload with c
   plan->setProperty(put_azure_blob_storage, "Create Container", "true");
   setDefaultCredentials();
   test_controller.runSession(plan, true);
-  REQUIRE(LogTestController::getInstance().contains("key:azure.container value:" + CONTAINER_NAME));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.blobname value:" + BLOB_NAME));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.primaryUri value:" + mock_blob_storage_ptr->PRIMARY_URI));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.etag value:" + mock_blob_storage_ptr->ETAG));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.length value:" + std::to_string(TEST_DATA.size())));
-  REQUIRE(LogTestController::getInstance().contains("key:azure.timestamp value:" + mock_blob_storage_ptr->TEST_TIMESTAMP));
-  REQUIRE(mock_blob_storage_ptr->input_data == TEST_DATA);
-  REQUIRE(mock_blob_storage_ptr->getContainerCreated() == true);
-  REQUIRE(mock_blob_storage_ptr->getContainerName() == CONTAINER_NAME);
-  REQUIRE(getFailedFlowFileContents().size() == 0);
+  CHECK(LogTestController::getInstance().contains("key:azure.container value:" + CONTAINER_NAME));
+  CHECK(LogTestController::getInstance().contains("key:azure.blobname value:" + BLOB_NAME));
+  CHECK(LogTestController::getInstance().contains("key:azure.primaryUri value:" + mock_blob_storage_ptr->PRIMARY_URI));
+  CHECK(LogTestController::getInstance().contains("key:azure.etag value:" + mock_blob_storage_ptr->ETAG));
+  CHECK(LogTestController::getInstance().contains("key:azure.length value:" + std::to_string(TEST_DATA.size())));
+  CHECK(LogTestController::getInstance().contains("key:azure.timestamp value:" + mock_blob_storage_ptr->TEST_TIMESTAMP));
+  CHECK(mock_blob_storage_ptr->getInputData() == TEST_DATA);
+  CHECK(mock_blob_storage_ptr->getContainerCreated() == true);
+  auto passed_params = mock_blob_storage_ptr->getPassedParams();
+  CHECK(passed_params.container_name == CONTAINER_NAME);
+  CHECK(getFailedFlowFileContents().size() == 0);
 }
 
 TEST_CASE_METHOD(PutAzureBlobStorageTestsFixture, "Test Azure blob upload failure", "[azureBlobStorageUpload]") {
diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
index 2b56897..ab6cdc8 100644
--- a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
@@ -170,11 +170,46 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials
   REQUIRE(getFailedFlowFileContents().size() == 0);
 }
 
+TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.credentials.buildConnectionString().empty());
+  CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
+  CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
   plan_->setProperty(update_attribute_, "test.filesystemname", "", true);
   test_controller_.runSession(plan_, true);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
   auto failed_flowfiles = getFailedFlowFileContents();
   REQUIRE(failed_flowfiles.size() == 1);
   REQUIRE(failed_flowfiles[0] == TEST_DATA);
@@ -188,19 +223,19 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empt
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") {
   test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK_FALSE(passed_params.replace_file);
   REQUIRE(getFailedFlowFileContents().size() == 0);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
-  REQUIRE(passed_params.connection_string == CONNECTION_STRING);
-  REQUIRE(passed_params.file_system_name == FILESYSTEM_NAME);
-  REQUIRE(passed_params.directory_name == DIRECTORY_NAME);
-  REQUIRE(passed_params.filename == GETFILE_FILE_NAME);
-  REQUIRE_FALSE(passed_params.replace_file);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "File creation fails", "[azureDataLakeStorageUpload]") {
@@ -235,8 +270,8 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'i
   test_controller_.runSession(plan_, true);
   REQUIRE(getFailedFlowFileContents().size() == 0);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME));
-  REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms));
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
@@ -245,27 +280,27 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'repl
     toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::REPLACE_FILE));
   mock_data_lake_storage_client_ptr_->setFileCreation(false);
   test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK(passed_params.replace_file);
   REQUIRE(getFailedFlowFileContents().size() == 0);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
-  REQUIRE(passed_params.connection_string == CONNECTION_STRING);
-  REQUIRE(passed_params.file_system_name == FILESYSTEM_NAME);
-  REQUIRE(passed_params.directory_name == DIRECTORY_NAME);
-  REQUIRE(passed_params.filename == GETFILE_FILE_NAME);
-  REQUIRE(passed_params.replace_file);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") {
   plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "");
   test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  CHECK(passed_params.directory_name == "");
   REQUIRE(getFailedFlowFileContents().size() == 0);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n"));
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
-  REQUIRE(passed_params.directory_name == "");
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n"));
 }