You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/01/17 11:22:51 UTC
[nifi-minifi-cpp] 01/04: MINIFICPP-1630 Create FetchAzureDataLakeStorage processor
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a9a0ade72d2c278d457f2b952fb67d7d4eaa981a
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu Aug 19 11:35:13 2021 +0200
MINIFICPP-1630 Create FetchAzureDataLakeStorage processor
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1221
---
PROCESSORS.md | 28 +++++
README.md | 2 +-
extensions/aws/processors/FetchS3Object.cpp | 2 +-
extensions/aws/processors/FetchS3Object.h | 11 +-
.../azure/processors/FetchAzureDataLakeStorage.cpp | 126 +++++++++++++++++++
.../azure/processors/FetchAzureDataLakeStorage.h | 100 ++++++++++++++++
extensions/azure/storage/AzureDataLakeStorage.cpp | 13 +-
extensions/azure/storage/AzureDataLakeStorage.h | 2 +
.../azure/storage/AzureDataLakeStorageClient.cpp | 39 +++++-
.../azure/storage/AzureDataLakeStorageClient.h | 29 ++++-
extensions/azure/storage/DataLakeStorageClient.h | 9 ++
.../azure-tests/AzureDataLakeStorageTestsFixture.h | 33 +++--
.../azure-tests/FetchAzureDataLakeStorageTests.cpp | 133 +++++++++++++++++++++
.../test/azure-tests/MockDataLakeStorageClient.h | 37 ++++++
14 files changed, 537 insertions(+), 27 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 5b393a0..6a2dc8e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -21,6 +21,7 @@
- [ExecuteSQL](#executesql)
- [ExecuteScript](#executescript)
- [ExtractText](#extracttext)
+- [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
- [FetchOPCProcessor](#fetchopcprocessor)
- [FetchS3Object](#fetchs3object)
- [FetchSFTP](#fetchsftp)
@@ -504,6 +505,33 @@ In the list below, the names of required properties appear in bold. Any other pr
|success|success operational on the flow record|
+## FetchAzureDataLakeStorage
+
+### Description
+
+Fetch the provided file from Azure Data Lake Storage Gen 2
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
+|Range Start|||The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.<br/>**Supports Expression Language: true**|
+|Range Length|||The number of bytes to download from the object, starting from the Range Start. An empty value or a value that extends beyond the end of the object will read to the end of the object.<br/>**Supports Expression Language: true**|
+|Number of Retries|0||The number of automatic retries to perform if the download fails.<br/>**Supports Expression Language: true**|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|In case of fetch failure flowfiles are transferred to this relationship|
+|success|Files that have been successfully fetched from Azure storage are transferred to this relationship|
+
+
## FetchOPCProcessor
### Description
diff --git a/README.md b/README.md
index 934478b..7dac13e 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
| ------------- |:-------------| :-----|
| Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON |
| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON |
-| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON |
+| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage) | -DENABLE_AZURE=ON |
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON |
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON |
| GPS | GetGPS | -DENABLE_GPS=ON |
diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index c7de905..d0099b4 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -109,7 +109,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
return;
}
- WriteCallback callback(flow_file->getSize(), *get_object_params, s3_wrapper_);
+ WriteCallback callback(*get_object_params, s3_wrapper_);
session->write(flow_file, &callback);
if (callback.result_) {
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 7e0d79c..23f2dca 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -65,14 +65,12 @@ class FetchS3Object : public S3Processor {
class WriteCallback : public OutputStreamCallback {
public:
- WriteCallback(uint64_t flow_size, const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
- : flow_size_(flow_size)
- , get_object_params_(get_object_params)
- , s3_wrapper_(s3_wrapper) {
+ WriteCallback(const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
+ : get_object_params_(get_object_params),
+ s3_wrapper_(s3_wrapper) {
}
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
- std::vector<uint8_t> buffer;
result_ = s3_wrapper_.getObject(get_object_params_, *stream);
if (!result_) {
return 0;
@@ -81,11 +79,8 @@ class FetchS3Object : public S3Processor {
return result_->write_size;
}
- uint64_t flow_size_;
-
const minifi::aws::s3::GetObjectRequestParameters& get_object_params_;
aws::s3::S3Wrapper& s3_wrapper_;
- uint64_t write_size_ = 0;
std::optional<minifi::aws::s3::GetObjectResult> result_;
};
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..4885190
--- /dev/null
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -0,0 +1,126 @@
+/**
+ * @file FetchAzureDataLakeStorage.cpp
+ * FetchAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchAzureDataLakeStorage.h"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property FetchAzureDataLakeStorage::RangeStart(
+ core::PropertyBuilder::createProperty("Range Start")
+ ->withDescription("The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchAzureDataLakeStorage::RangeLength(
+ core::PropertyBuilder::createProperty("Range Length")
+ ->withDescription("The number of bytes to download from the object, starting from the Range Start. "
+ "An empty value or a value that extends beyond the end of the object will read to the end of the object.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchAzureDataLakeStorage::NumberOfRetries(
+ core::PropertyBuilder::createProperty("Number of Retries")
+ ->withDescription("The number of automatic retries to perform if the download fails.")
+ ->withDefaultValue<uint64_t>(0)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship FetchAzureDataLakeStorage::Success("success", "Files that have been successfully fetched from Azure storage are transferred to this relationship");
+const core::Relationship FetchAzureDataLakeStorage::Failure("failure", "In case of fetch failure flowfiles are transferred to this relationship");
+
+void FetchAzureDataLakeStorage::initialize() {
+ // Add new supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ FileName,
+ RangeStart,
+ RangeLength,
+ NumberOfRetries
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+std::optional<storage::FetchAzureDataLakeStorageParameters> FetchAzureDataLakeStorage::buildFetchParameters(
+ core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ storage::FetchAzureDataLakeStorageParameters params;
+ if (!setCommonParameters(params, context, flow_file)) {
+ return std::nullopt;
+ }
+
+ std::string value;
+ if (context.getProperty(RangeStart, value, flow_file)) {
+ params.range_start = std::stoull(value);
+ logger_->log_debug("Range Start property set to %llu", *params.range_start);
+ }
+
+ if (context.getProperty(RangeLength, value, flow_file)) {
+ params.range_length = std::stoull(value);
+ logger_->log_debug("Range Length property set to %llu", *params.range_length);
+ }
+
+ if (context.getProperty(NumberOfRetries, value, flow_file)) {
+ params.number_of_retries = std::stoull(value);
+ logger_->log_debug("Number Of Retries property set to %llu", *params.number_of_retries);
+ }
+
+ return params;
+}
+
+void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session);
+ logger_->log_debug("FetchAzureDataLakeStorage onTrigger");
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ const auto params = buildFetchParameters(*context, flow_file);
+ if (!params) {
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ auto fetched_flow_file = session->create(flow_file);
+ WriteCallback callback(azure_data_lake_storage_, *params, logger_);
+ session->write(fetched_flow_file, &callback);
+
+ if (callback.getResult() == std::nullopt) {
+ logger_->log_error("Failed to fetch file '%s' from Azure Data Lake storage", params->filename);
+ session->transfer(flow_file, Failure);
+ session->remove(fetched_flow_file);
+ } else {
+ logger_->log_debug("Successfully fetched file '%s' from filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name);
+ session->transfer(fetched_flow_file, Success);
+ session->remove(flow_file);
+ }
+}
+
+REGISTER_RESOURCE(FetchAzureDataLakeStorage, "Fetch the provided file from Azure Data Lake Storage Gen 2");
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
new file mode 100644
index 0000000..4768e49
--- /dev/null
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -0,0 +1,100 @@
+/**
+ * @file FetchAzureDataLakeStorage.h
+ * FetchAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class FetchAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+ // Supported Properties
+ EXTENSIONAPI static const core::Property RangeStart;
+ EXTENSIONAPI static const core::Property RangeLength;
+ EXTENSIONAPI static const core::Property NumberOfRetries;
+
+ // Supported Relationships
+ static const core::Relationship Failure;
+ static const core::Relationship Success;
+
+ explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
+ }
+
+ ~FetchAzureDataLakeStorage() override = default;
+
+ void initialize() override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ friend class ::AzureDataLakeStorageTestsFixture<FetchAzureDataLakeStorage>;
+
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::FetchAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger)
+ : azure_data_lake_storage_(azure_data_lake_storage),
+ params_(params),
+ logger_(std::move(logger)) {
+ }
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ result_size_ = azure_data_lake_storage_.fetchFile(params_, *stream);
+ if (!result_size_) {
+ return 0;
+ }
+
+ return gsl::narrow<int64_t>(*result_size_);
+ }
+
+ auto getResult() const {
+ return result_size_;
+ }
+
+ private:
+ storage::AzureDataLakeStorage& azure_data_lake_storage_;
+ const storage::FetchAzureDataLakeStorageParameters& params_;
+ std::optional<uint64_t> result_size_ = std::nullopt;
+ std::shared_ptr<core::logging::Logger> logger_;
+ };
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
+ explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+ }
+
+ std::optional<storage::FetchAzureDataLakeStorageParameters> buildFetchParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 2be5292..34e2d3b 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -21,6 +21,7 @@
#include "AzureDataLakeStorage.h"
#include "AzureDataLakeStorageClient.h"
+#include "io/StreamPipe.h"
namespace org::apache::nifi::minifi::azure::storage {
@@ -52,7 +53,7 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL
}
}
-bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) {
+bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters& params) {
try {
return data_lake_storage_client_->deleteFile(params);
} catch (const std::exception& ex) {
@@ -61,4 +62,14 @@ bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageP
}
}
+std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) {
+ try {
+ auto result = data_lake_storage_client_->fetchFile(params);
+ return internal::pipe(result.get(), &stream);
+ } catch (const std::exception& ex) {
+ logger_->log_error("An exception occurred while fetching '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
+ return std::nullopt;
+ }
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h
index 8dd9b8a..d1291a2 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -28,6 +28,7 @@
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "DataLakeStorageClient.h"
+#include "azure/core/io/body_stream.hpp"
namespace org::apache::nifi::minifi::azure::storage {
@@ -48,6 +49,7 @@ class AzureDataLakeStorage {
storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer);
bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
+ std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream);
private:
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index 7a54a69..e542432 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -18,7 +18,11 @@
* limitations under the License.
*/
+#include <utility>
+
#include "AzureDataLakeStorageClient.h"
+#include "azure/core/http/http.hpp"
+#include "azure/storage/files/datalake/datalake_options.hpp"
#include "azure/identity.hpp"
@@ -30,30 +34,35 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() {
utils::AzureSdkLogger::initialize();
}
-void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name) {
- if (client_ && credentials_ == credentials && file_system_name_ == file_system_name) {
+void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
+ if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) {
logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
return;
}
+ Azure::Storage::Files::DataLake::DataLakeClientOptions options;
+ if (number_of_retries) {
+ options.Retry.MaxRetries = *number_of_retries;
+ }
+
if (credentials.getUseManagedIdentityCredentials()) {
auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
- "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
-
+ "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options);
client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
} else {
client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
- Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name));
+ Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options));
logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
}
file_system_name_ = file_system_name;
credentials_ = credentials;
+ number_of_retries_ = number_of_retries;
}
Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) {
- resetClientIfNeeded(params.credentials, params.file_system_name);
+ resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
auto directory_client = client_->GetDirectoryClient(params.directory_name);
if (!params.directory_name.empty()) {
@@ -80,4 +89,22 @@ bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStoragePara
return result.Value.Deleted;
}
+std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const FetchAzureDataLakeStorageParameters& params) {
+ auto file_client = getFileClient(params);
+ Azure::Storage::Files::DataLake::DownloadFileOptions options;
+ if (params.range_start || params.range_length) {
+ Azure::Core::Http::HttpRange range;
+ if (params.range_start) {
+ range.Offset = *params.range_start;
+ }
+
+ if (params.range_length) {
+ range.Length = *params.range_length;
+ }
+ options.Range = range;
+ }
+ auto result = file_client.Download(options);
+ return std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value));
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index e7d933d..cd8d791 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -21,6 +21,7 @@
#include <string>
#include <memory>
+#include <utility>
#include <azure/storage/files/datalake.hpp>
@@ -56,12 +57,38 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
*/
bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override;
+ /**
+ * Fetches a file from the Azure Data Lake Storage
+ * @param params Parameters required for connecting and file access on Azure
+ * @return Download result of Azure Data Lake storage client
+ */
+ std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) override;
+
private:
- void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name);
+ class AzureDataLakeStorageInputStream : public io::InputStream {
+ public:
+ explicit AzureDataLakeStorageInputStream(Azure::Storage::Files::DataLake::Models::DownloadFileResult&& result)
+ : result_(std::move(result)) {
+ }
+
+ size_t size() const override {
+ return result_.Body->Length();
+ }
+
+ size_t read(uint8_t *value, size_t len) override {
+ return result_.Body->Read(value, len);
+ }
+
+ private:
+ Azure::Storage::Files::DataLake::Models::DownloadFileResult result_;
+ };
+
+ void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params);
AzureStorageCredentials credentials_;
std::string file_system_name_;
+ std::optional<uint64_t> number_of_retries_;
std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
};
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index 4e97d72..eb57d62 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -21,10 +21,12 @@
#include <string>
#include <optional>
+#include <memory>
#include "AzureStorageCredentials.h"
#include "utils/gsl.h"
+#include "io/InputStream.h"
namespace org::apache::nifi::minifi::azure::storage {
@@ -33,6 +35,7 @@ struct AzureDataLakeStorageParameters {
std::string file_system_name;
std::string directory_name;
std::string filename;
+ std::optional<uint64_t> number_of_retries;
};
struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
@@ -41,11 +44,17 @@ struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters
using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
+struct FetchAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
+ std::optional<uint64_t> range_start;
+ std::optional<uint64_t> range_length;
+};
+
class DataLakeStorageClient {
public:
virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0;
+ virtual std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) = 0;
virtual ~DataLakeStorageClient() = default;
};
diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
index 6878e71..1ef56b3 100644
--- a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
+++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
@@ -71,19 +71,32 @@ class AzureDataLakeStorageTestsFixture {
update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true);
plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
- logattribute->setAutoTerminatedRelationships({{"success", "d"}});
- putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
- plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_);
- putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
- output_dir_ = test_controller_.createTempDirectory();
- plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
+ success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+ plan_->addConnection(logattribute, {"success", "d"}, success_putfile_);
+ success_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+ success_output_dir_ = test_controller_.createTempDirectory();
+ plan_->setProperty(success_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
+
+ failure_putfile_ = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
+ plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, failure_putfile_);
+ failure_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+ failure_output_dir_ = test_controller_.createTempDirectory();
+ plan_->setProperty(failure_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
setDefaultProperties();
}
std::vector<std::string> getFailedFlowFileContents() {
+ return getFileContents(failure_output_dir_);
+ }
+
+ std::vector<std::string> getSuccessfulFlowFileContents() {
+ return getFileContents(success_output_dir_);
+ }
+
+ std::vector<std::string> getFileContents(const std::string& dir) {
std::vector<std::string> file_contents;
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
@@ -92,7 +105,7 @@ class AzureDataLakeStorageTestsFixture {
return true;
};
- utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
+ utils::file::FileUtils::list_dir(dir, lambda, plan_->getLogger(), false);
return file_contents;
}
@@ -116,7 +129,9 @@ class AzureDataLakeStorageTestsFixture {
std::shared_ptr<core::Processor> azure_data_lake_storage_;
std::shared_ptr<core::Processor> get_file_;
std::shared_ptr<core::Processor> update_attribute_;
- std::shared_ptr<core::Processor> putfile_;
+ std::shared_ptr<core::Processor> success_putfile_;
+ std::shared_ptr<core::Processor> failure_putfile_;
std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
- std::string output_dir_;
+ std::string failure_output_dir_;
+ std::string success_output_dir_;
};
diff --git a/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..a055962
--- /dev/null
+++ b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageTestsFixture.h"
+#include "processors/FetchAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace {
+
+using namespace std::chrono_literals;
+
+using FetchAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::FetchAzureDataLakeStorage>;
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+ CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+ CHECK(passed_params.credentials.buildConnectionString().empty());
+ CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
+ CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(update_attribute_, "test.filesystemname", "", true);
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
+ auto failed_flowfiles = getFailedFlowFileContents();
+ REQUIRE(failed_flowfiles.size() == 1);
+ REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file succeeds", "[azureDataLakeStorageFetch]") {
+ test_controller_.runSession(plan_, true);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.filename == GETFILE_FILE_NAME);
+ CHECK(passed_params.range_start == std::nullopt);
+ CHECK(passed_params.range_length == std::nullopt);
+ auto success_contents = getSuccessfulFlowFileContents();
+ REQUIRE(success_contents.size() == 1);
+ REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch a range of the file succeeds", "[azureDataLakeStorageFetch]") {
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeStart.getName(), "5");
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeLength.getName(), "10");
+ test_controller_.runSession(plan_, true);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.filename == GETFILE_FILE_NAME);
+ CHECK(*passed_params.range_start == 5);
+ CHECK(*passed_params.range_length == 10);
+ auto success_contents = getSuccessfulFlowFileContents();
+ REQUIRE(success_contents.size() == 1);
+ REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA.substr(5, 10));
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Number of Retries is set", "[azureDataLakeStorageFetch]") {
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries.getName(), "1");
+ test_controller_.runSession(plan_, true);
+ REQUIRE(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries == 1);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file fails", "[azureDataLakeStorageFetch]") {
+ mock_data_lake_storage_client_ptr_->setFetchFailure(true);
+ test_controller_.runSession(plan_, true);
+ REQUIRE(getSuccessfulFlowFileContents().size() == 0);
+ auto failed_contents = getFailedFlowFileContents();
+ REQUIRE(failed_contents.size() == 1);
+ REQUIRE(failed_contents[0] == TEST_DATA);
+}
+
+} // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
index eeac4dc..8969e1e 100644
--- a/libminifi/test/azure-tests/MockDataLakeStorageClient.h
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -20,12 +20,17 @@
#include <string>
#include <stdexcept>
+#include <memory>
+#include <utility>
+#include <vector>
#include "storage/DataLakeStorageClient.h"
+#include "io/BufferStream.h"
class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient {
public:
const std::string PRIMARY_URI = "http://test-uri/file";
+ const std::string FETCHED_DATA = "test azure data for stream";
bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
if (file_creation_error_) {
@@ -55,6 +60,27 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
return delete_result_;
}
+ std::unique_ptr<org::apache::nifi::minifi::io::InputStream> fetchFile(const org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters& params) override {
+ if (fetch_fails_) {
+ throw std::runtime_error("error");
+ }
+
+ fetch_params_ = params;
+ buffer_.clear();
+ uint64_t range_start = 0;
+ uint64_t size = FETCHED_DATA.size();
+ if (params.range_start) {
+ range_start = *params.range_start;
+ }
+
+ if (params.range_length) {
+ size = *params.range_length;
+ }
+
+ buffer_.assign(FETCHED_DATA.begin() + range_start, FETCHED_DATA.begin() + range_start + size);
+ return std::make_unique<org::apache::nifi::minifi::io::BufferStream>(buffer_.data(), buffer_.size());
+ }
+
void setFileCreation(bool create_file) {
create_file_ = create_file;
}
@@ -75,6 +101,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
delete_result_ = delete_result;
}
+ void setFetchFailure(bool fetch_fails) {
+ fetch_fails_ = fetch_fails;
+ }
+
org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const {
return put_params_;
}
@@ -83,6 +113,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
return delete_params_;
}
+ org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters getPassedFetchParams() const {
+ return fetch_params_;
+ }
+
private:
const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
bool create_file_ = true;
@@ -90,7 +124,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
bool upload_fails_ = false;
bool delete_fails_ = false;
bool delete_result_ = true;
+ bool fetch_fails_ = false;
std::string input_data_;
+ std::vector<uint8_t> buffer_;
org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_;
org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_;
+ org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters fetch_params_;
};