You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/12 15:22:24 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 0014de517014b46393ec4aca8a01022302c977b0
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Oct 12 16:46:52 2022 +0200
MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors
Closes #1422
Signed-off-by: Marton Szasz <sz...@apache.org>
---
extensions/aws/processors/DeleteS3Object.h | 2 +-
extensions/aws/processors/FetchS3Object.h | 2 +-
extensions/aws/processors/PutS3Object.h | 2 +-
.../azure/processors/DeleteAzureBlobStorage.h | 2 +-
.../azure/processors/DeleteAzureDataLakeStorage.h | 2 +-
.../azure/processors/FetchAzureDataLakeStorage.h | 2 +-
.../azure/processors/PutAzureDataLakeStorage.h | 2 +-
.../azure/storage/AzureBlobStorageClient.cpp | 40 ++++++++--------------
extensions/azure/storage/AzureBlobStorageClient.h | 5 +--
.../azure/storage/AzureDataLakeStorageClient.cpp | 26 +++++---------
.../azure/storage/AzureDataLakeStorageClient.h | 13 +++----
11 files changed, 36 insertions(+), 62 deletions(-)
diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h
index 1566eb86a..8b3214cc2 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -52,7 +52,7 @@ class DeleteS3Object : public S3Processor {
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 5a5b73344..49377ee77 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -56,7 +56,7 @@ class FetchS3Object : public S3Processor {
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 13f0cbd60..f04f89671 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -76,7 +76,7 @@ class PutS3Object : public S3Processor {
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.h b/extensions/azure/processors/DeleteAzureBlobStorage.h
index f185094e0..22a8e51a3 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.h
+++ b/extensions/azure/processors/DeleteAzureBlobStorage.h
@@ -52,7 +52,7 @@ class DeleteAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorB
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
index 89f0e682c..ea5f6277b 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -44,7 +44,7 @@ class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageFileProcesso
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 7e9cd295a..f4d14265d 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -52,7 +52,7 @@ class FetchAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessor
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 420175a27..ac674dbcf 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -51,7 +51,7 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
- EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
diff --git a/extensions/azure/storage/AzureBlobStorageClient.cpp b/extensions/azure/storage/AzureBlobStorageClient.cpp
index 6b7ee3388..f83302ded 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.cpp
+++ b/extensions/azure/storage/AzureBlobStorageClient.cpp
@@ -54,59 +54,49 @@ AzureBlobStorageClient::AzureBlobStorageClient() {
utils::AzureSdkLogger::initialize();
}
-void AzureBlobStorageClient::resetClientIfNeeded(const AzureStorageCredentials &credentials, const std::string &container_name) {
- if (container_client_ && credentials == credentials_ && container_name == container_name_) {
- logger_->log_debug("Azure Blob Storage client credentials have not changed, no need to reset client");
- return;
- }
-
+std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> AzureBlobStorageClient::createClient(const AzureStorageCredentials &credentials, const std::string &container_name) {
if (credentials.getUseManagedIdentityCredentials()) {
auto storage_client = Azure::Storage::Blobs::BlobServiceClient(
"https://" + credentials.getStorageAccountName() + ".blob." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
- container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
- logger_->log_debug("Azure Blob Storage client has been reset with new managed identity credentials.");
+ return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
} else {
- container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
+ return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(credentials.buildConnectionString(), container_name));
- logger_->log_debug("Azure Blob Storage client has been reset with new connection string credentials.");
}
-
- credentials_ = credentials;
- container_name_ = container_name;
}
bool AzureBlobStorageClient::createContainerIfNotExists(const PutAzureBlobStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.container_name);
- return container_client_->CreateIfNotExists().Value.Created;
+ auto container_client = createClient(params.credentials, params.container_name);
+ return container_client->CreateIfNotExists().Value.Created;
}
Azure::Storage::Blobs::Models::UploadBlockBlobResult AzureBlobStorageClient::uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer) {
- resetClientIfNeeded(params.credentials, params.container_name);
- auto blob_client = container_client_->GetBlockBlobClient(params.blob_name);
+ auto container_client = createClient(params.credentials, params.container_name);
+ auto blob_client = container_client->GetBlockBlobClient(params.blob_name);
return blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size()).Value;
}
std::string AzureBlobStorageClient::getUrl(const AzureBlobStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.container_name);
- return container_client_->GetUrl();
+ auto container_client = createClient(params.credentials, params.container_name);
+ return container_client->GetUrl();
}
bool AzureBlobStorageClient::deleteBlob(const DeleteAzureBlobStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.container_name);
+ auto container_client = createClient(params.credentials, params.container_name);
Azure::Storage::Blobs::DeleteBlobOptions delete_options;
if (params.optional_deletion == OptionalDeletion::INCLUDE_SNAPSHOTS) {
delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::IncludeSnapshots;
} else if (params.optional_deletion == OptionalDeletion::DELETE_SNAPSHOTS_ONLY) {
delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::OnlySnapshots;
}
- auto response = container_client_->DeleteBlob(params.blob_name, delete_options);
+ auto response = container_client->DeleteBlob(params.blob_name, delete_options);
return response.Value.Deleted;
}
std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAzureBlobStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.container_name);
- auto blob_client = container_client_->GetBlobClient(params.blob_name);
+ auto container_client = createClient(params.credentials, params.container_name);
+ auto blob_client = container_client->GetBlobClient(params.blob_name);
Azure::Storage::Blobs::DownloadBlobOptions options;
if (params.range_start || params.range_length) {
Azure::Core::Http::HttpRange range;
@@ -125,10 +115,10 @@ std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAz
std::vector<Azure::Storage::Blobs::Models::BlobItem> AzureBlobStorageClient::listContainer(const ListAzureBlobStorageParameters& params) {
std::vector<Azure::Storage::Blobs::Models::BlobItem> result;
- resetClientIfNeeded(params.credentials, params.container_name);
+ auto container_client = createClient(params.credentials, params.container_name);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = params.prefix;
- for (auto page_result = container_client_->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
+ for (auto page_result = container_client->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
result.insert(result.end(), page_result.Blobs.begin(), page_result.Blobs.end());
}
return result;
diff --git a/extensions/azure/storage/AzureBlobStorageClient.h b/extensions/azure/storage/AzureBlobStorageClient.h
index 3f9378b21..325f4ef24 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.h
+++ b/extensions/azure/storage/AzureBlobStorageClient.h
@@ -43,11 +43,8 @@ class AzureBlobStorageClient : public BlobStorageClient {
std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const ListAzureBlobStorageParameters& params) override;
private:
- void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string &container_name);
+ static std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> createClient(const AzureStorageCredentials& credentials, const std::string &container_name);
- AzureStorageCredentials credentials_;
- std::string container_name_;
- std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> container_client_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureBlobStorageClient>::getLogger()};
};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e26cc73e3..d64ba296b 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -34,12 +34,8 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() {
utils::AzureSdkLogger::initialize();
}
-void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
- if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) {
- logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
- return;
- }
-
+std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> AzureDataLakeStorageClient::createClient(
+ const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
Azure::Storage::Files::DataLake::DataLakeClientOptions options;
if (number_of_retries) {
options.Retry.MaxRetries = *number_of_retries;
@@ -48,22 +44,16 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia
if (credentials.getUseManagedIdentityCredentials()) {
auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
"https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options);
- client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
- logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
+ return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
} else {
- client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
+ return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options));
- logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
}
-
- file_system_name_ = file_system_name;
- credentials_ = credentials;
- number_of_retries_ = number_of_retries;
}
Azure::Storage::Files::DataLake::DataLakeDirectoryClient AzureDataLakeStorageClient::getDirectoryClient(const AzureDataLakeStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
- return client_->GetDirectoryClient(params.directory_name);
+ auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+ return client->GetDirectoryClient(params.directory_name);
}
Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageFileOperationParameters& params) {
@@ -113,8 +103,8 @@ std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const Fet
std::vector<Azure::Storage::Files::DataLake::Models::PathItem> AzureDataLakeStorageClient::listDirectory(const ListAzureDataLakeStorageParameters& params) {
std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
if (params.directory_name.empty()) {
- resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
- for (auto page_result = client_->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
+ auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+ for (auto page_result = client->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
result.insert(result.end(), page_result.Paths.begin(), page_result.Paths.end());
}
} else {
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 308c118cc..2e9fd98ac 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -91,14 +91,11 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
Azure::Storage::Files::DataLake::Models::DownloadFileResult result_;
};
- void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
- Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
- Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
-
- AzureStorageCredentials credentials_;
- std::string file_system_name_;
- std::optional<uint64_t> number_of_retries_;
- std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
+ static std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> createClient(
+ const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
+ static Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
+ static Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
+
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
};