You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/12 15:22:24 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0014de517014b46393ec4aca8a01022302c977b0
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Oct 12 16:46:52 2022 +0200

    MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors
    
    Closes #1422
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 extensions/aws/processors/DeleteS3Object.h         |  2 +-
 extensions/aws/processors/FetchS3Object.h          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  2 +-
 .../azure/processors/DeleteAzureBlobStorage.h      |  2 +-
 .../azure/processors/DeleteAzureDataLakeStorage.h  |  2 +-
 .../azure/processors/FetchAzureDataLakeStorage.h   |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  2 +-
 .../azure/storage/AzureBlobStorageClient.cpp       | 40 ++++++++--------------
 extensions/azure/storage/AzureBlobStorageClient.h  |  5 +--
 .../azure/storage/AzureDataLakeStorageClient.cpp   | 26 +++++---------
 .../azure/storage/AzureDataLakeStorageClient.h     | 13 +++----
 11 files changed, 36 insertions(+), 62 deletions(-)

diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h
index 1566eb86a..8b3214cc2 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -52,7 +52,7 @@ class DeleteS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 5a5b73344..49377ee77 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -56,7 +56,7 @@ class FetchS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 13f0cbd60..f04f89671 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -76,7 +76,7 @@ class PutS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.h b/extensions/azure/processors/DeleteAzureBlobStorage.h
index f185094e0..22a8e51a3 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.h
+++ b/extensions/azure/processors/DeleteAzureBlobStorage.h
@@ -52,7 +52,7 @@ class DeleteAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorB
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
index 89f0e682c..ea5f6277b 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -44,7 +44,7 @@ class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageFileProcesso
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 7e9cd295a..f4d14265d 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -52,7 +52,7 @@ class FetchAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessor
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 420175a27..ac674dbcf 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -51,7 +51,7 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/storage/AzureBlobStorageClient.cpp b/extensions/azure/storage/AzureBlobStorageClient.cpp
index 6b7ee3388..f83302ded 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.cpp
+++ b/extensions/azure/storage/AzureBlobStorageClient.cpp
@@ -54,59 +54,49 @@ AzureBlobStorageClient::AzureBlobStorageClient() {
   utils::AzureSdkLogger::initialize();
 }
 
-void AzureBlobStorageClient::resetClientIfNeeded(const AzureStorageCredentials &credentials, const std::string &container_name) {
-  if (container_client_ && credentials == credentials_ && container_name == container_name_) {
-    logger_->log_debug("Azure Blob Storage client credentials have not changed, no need to reset client");
-    return;
-  }
-
+std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> AzureBlobStorageClient::createClient(const AzureStorageCredentials &credentials, const std::string &container_name) {
   if (credentials.getUseManagedIdentityCredentials()) {
     auto storage_client = Azure::Storage::Blobs::BlobServiceClient(
       "https://" + credentials.getStorageAccountName() + ".blob." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
 
-    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
-    logger_->log_debug("Azure Blob Storage client has been reset with new managed identity credentials.");
+    return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
   } else {
-    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
+    return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
       Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(credentials.buildConnectionString(), container_name));
-    logger_->log_debug("Azure Blob Storage client has been reset with new connection string credentials.");
   }
-
-  credentials_ = credentials;
-  container_name_ = container_name;
 }
 
 bool AzureBlobStorageClient::createContainerIfNotExists(const PutAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  return container_client_->CreateIfNotExists().Value.Created;
+  auto container_client = createClient(params.credentials, params.container_name);
+  return container_client->CreateIfNotExists().Value.Created;
 }
 
 Azure::Storage::Blobs::Models::UploadBlockBlobResult AzureBlobStorageClient::uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  auto blob_client = container_client_->GetBlockBlobClient(params.blob_name);
+  auto container_client = createClient(params.credentials, params.container_name);
+  auto blob_client = container_client->GetBlockBlobClient(params.blob_name);
   return blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size()).Value;
 }
 
 std::string AzureBlobStorageClient::getUrl(const AzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  return container_client_->GetUrl();
+  auto container_client = createClient(params.credentials, params.container_name);
+  return container_client->GetUrl();
 }
 
 bool AzureBlobStorageClient::deleteBlob(const DeleteAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
+  auto container_client = createClient(params.credentials, params.container_name);
   Azure::Storage::Blobs::DeleteBlobOptions delete_options;
   if (params.optional_deletion == OptionalDeletion::INCLUDE_SNAPSHOTS) {
     delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::IncludeSnapshots;
   } else if (params.optional_deletion == OptionalDeletion::DELETE_SNAPSHOTS_ONLY) {
     delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::OnlySnapshots;
   }
-  auto response = container_client_->DeleteBlob(params.blob_name, delete_options);
+  auto response = container_client->DeleteBlob(params.blob_name, delete_options);
   return response.Value.Deleted;
 }
 
 std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  auto blob_client = container_client_->GetBlobClient(params.blob_name);
+  auto container_client = createClient(params.credentials, params.container_name);
+  auto blob_client = container_client->GetBlobClient(params.blob_name);
   Azure::Storage::Blobs::DownloadBlobOptions options;
   if (params.range_start || params.range_length) {
     Azure::Core::Http::HttpRange range;
@@ -125,10 +115,10 @@ std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAz
 
 std::vector<Azure::Storage::Blobs::Models::BlobItem> AzureBlobStorageClient::listContainer(const ListAzureBlobStorageParameters& params) {
   std::vector<Azure::Storage::Blobs::Models::BlobItem> result;
-  resetClientIfNeeded(params.credentials, params.container_name);
+  auto container_client = createClient(params.credentials, params.container_name);
   Azure::Storage::Blobs::ListBlobsOptions options;
   options.Prefix = params.prefix;
-  for (auto page_result = container_client_->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
+  for (auto page_result = container_client->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
     result.insert(result.end(), page_result.Blobs.begin(), page_result.Blobs.end());
   }
   return result;
diff --git a/extensions/azure/storage/AzureBlobStorageClient.h b/extensions/azure/storage/AzureBlobStorageClient.h
index 3f9378b21..325f4ef24 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.h
+++ b/extensions/azure/storage/AzureBlobStorageClient.h
@@ -43,11 +43,8 @@ class AzureBlobStorageClient : public BlobStorageClient {
   std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const ListAzureBlobStorageParameters& params) override;
 
  private:
-  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string &container_name);
+  static std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> createClient(const AzureStorageCredentials& credentials, const std::string &container_name);
 
-  AzureStorageCredentials credentials_;
-  std::string container_name_;
-  std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> container_client_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureBlobStorageClient>::getLogger()};
 };
 
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e26cc73e3..d64ba296b 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -34,12 +34,8 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() {
   utils::AzureSdkLogger::initialize();
 }
 
-void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
-  if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) {
-    logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
-    return;
-  }
-
+std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> AzureDataLakeStorageClient::createClient(
+    const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
   Azure::Storage::Files::DataLake::DataLakeClientOptions options;
   if (number_of_retries) {
     options.Retry.MaxRetries = *number_of_retries;
@@ -48,22 +44,16 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia
   if (credentials.getUseManagedIdentityCredentials()) {
     auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
         "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options);
-    client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
-    logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
+    return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
   } else {
-    client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
+    return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
         Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options));
-    logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
   }
-
-  file_system_name_ = file_system_name;
-  credentials_ = credentials;
-  number_of_retries_ = number_of_retries;
 }
 
 Azure::Storage::Files::DataLake::DataLakeDirectoryClient AzureDataLakeStorageClient::getDirectoryClient(const AzureDataLakeStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
-  return client_->GetDirectoryClient(params.directory_name);
+  auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+  return client->GetDirectoryClient(params.directory_name);
 }
 
 Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageFileOperationParameters& params) {
@@ -113,8 +103,8 @@ std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const Fet
 std::vector<Azure::Storage::Files::DataLake::Models::PathItem> AzureDataLakeStorageClient::listDirectory(const ListAzureDataLakeStorageParameters& params) {
   std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
   if (params.directory_name.empty()) {
-    resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
-    for (auto page_result = client_->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
+    auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+    for (auto page_result = client->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
       result.insert(result.end(), page_result.Paths.begin(), page_result.Paths.end());
     }
   } else {
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 308c118cc..2e9fd98ac 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -91,14 +91,11 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
     Azure::Storage::Files::DataLake::Models::DownloadFileResult result_;
   };
 
-  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
-  Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
-  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
-
-  AzureStorageCredentials credentials_;
-  std::string file_system_name_;
-  std::optional<uint64_t> number_of_retries_;
-  std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
+  static std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> createClient(
+    const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
+  static Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
+  static Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
+
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
 };