You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/12/01 13:27:06 UTC
[nifi-minifi-cpp] 02/03: MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6074634dcdb18f86d599aa9cfaee4f09e05752c8
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Dec 1 14:07:22 2021 +0100
MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor
Closes #1195
Signed-off-by: Marton Szasz <sz...@apache.org>
---
PROCESSORS.md | 29 +++-
README.md | 2 +-
.../AzureDataLakeStorageProcessorBase.cpp | 81 ++++++++++
.../processors/AzureDataLakeStorageProcessorBase.h | 64 ++++++++
.../azure/processors/AzureStorageProcessorBase.cpp | 6 +-
.../azure/processors/AzureStorageProcessorBase.h | 2 +-
.../processors/DeleteAzureDataLakeStorage.cpp | 85 +++++++++++
.../azure/processors/DeleteAzureDataLakeStorage.h | 67 ++++++++
.../azure/processors/PutAzureBlobStorage.cpp | 2 +-
.../azure/processors/PutAzureDataLakeStorage.cpp | 55 ++-----
.../azure/processors/PutAzureDataLakeStorage.h | 28 ++--
extensions/azure/storage/AzureDataLakeStorage.cpp | 9 ++
extensions/azure/storage/AzureDataLakeStorage.h | 1 +
.../azure/storage/AzureDataLakeStorageClient.cpp | 8 +-
.../azure/storage/AzureDataLakeStorageClient.h | 9 +-
extensions/azure/storage/DataLakeStorageClient.h | 8 +-
.../tests/unit/GenerateFlowFileTests.cpp | 9 +-
.../azure-tests/AzureDataLakeStorageTestsFixture.h | 122 +++++++++++++++
.../DeleteAzureDataLakeStorageTests.cpp | 128 ++++++++++++++++
.../test/azure-tests/MockDataLakeStorageClient.h | 96 ++++++++++++
.../test/azure-tests/PutAzureBlobStorageTests.cpp | 3 +-
.../azure-tests/PutAzureDataLakeStorageTests.cpp | 168 ++-------------------
22 files changed, 747 insertions(+), 235 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2a1bcbb..de7eadf 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -14,6 +14,7 @@
- [ConsumeKafka](#consumekafka)
- [ConsumeMQTT](#consumemqtt)
- [DefragmentText](#defragmenttext)
+- [DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
- [DeleteS3Object](#deletes3object)
- [ExecuteProcess](#executeprocess)
- [ExecutePythonProcessor](#executepythonprocessor)
@@ -328,6 +329,30 @@ In the list below, the names of required properties appear in bold. Any other pr
|failure|Flowfiles that failed the defragmentation process|
+## DeleteAzureDataLakeStorage
+
+### Description
+
+Deletes the provided file from Azure Data Lake Storage Gen 2
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|If file deletion from Azure Data Lake storage fails the flowfile is transferred to this relationship|
+|success|If file deletion from Azure Data Lake storage succeeds the flowfile is transferred to this relationship|
+
+
## DeleteS3Object
### Description
@@ -1287,10 +1312,10 @@ In the list below, the names of required properties appear in bold. Any other pr
| Name | Default Value | Allowable Values | Description |
| - | - | - | - |
|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
-|File Name|||The filename to be uploaded. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
-|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.|
### Relationships
diff --git a/README.md b/README.md
index 3aa08be..0bd2328 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
| ------------- |:-------------| :-----|
| Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON |
| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON |
-| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage) | -DENABLE_AZURE=ON |
+| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON |
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON |
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON |
| GPS | GetGPS | -DENABLE_GPS=ON |
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
new file mode 100644
index 0000000..04d7664
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
@@ -0,0 +1,81 @@
+/**
+ * @file AzureDataLakeStorageProcessorBase.cpp
+ * AzureDataLakeStorageProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property AzureDataLakeStorageProcessorBase::FilesystemName(
+ core::PropertyBuilder::createProperty("Filesystem Name")
+ ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.")
+ ->supportsExpressionLanguage(true)
+ ->isRequired(true)
+ ->build());
+const core::Property AzureDataLakeStorageProcessorBase::DirectoryName(
+ core::PropertyBuilder::createProperty("Directory Name")
+ ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. "
+ "If left empty it designates the root directory. The directory will be created if not already existing.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+const core::Property AzureDataLakeStorageProcessorBase::FileName(
+ core::PropertyBuilder::createProperty("File Name")
+ ->withDescription("The filename in Azure Storage. If left empty the filename attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+void AzureDataLakeStorageProcessorBase::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
+ gsl_Expects(context);
+ std::optional<storage::AzureStorageCredentials> credentials;
+ std::tie(std::ignore, credentials) = getCredentialsFromControllerService(*context);
+ if (!credentials) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid");
+ }
+
+ if (!credentials->isValid()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Credentials set in the Azure Storage credentials service are invalid");
+ }
+
+ credentials_ = *credentials;
+}
+
+bool AzureDataLakeStorageProcessorBase::setCommonParameters(
+ storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ params.credentials = credentials_;
+
+ if (!context.getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) {
+ logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name);
+ return false;
+ }
+
+ context.getProperty(DirectoryName, params.directory_name, flow_file);
+
+ context.getProperty(FileName, params.filename, flow_file);
+ if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) {
+ logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!");
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
new file mode 100644
index 0000000..14f2a92
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
@@ -0,0 +1,64 @@
+/**
+ * @file AzureDataLakeStorageProcessorBase.h
+ * AzureDataLakeStorageProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <memory>
+#include <optional>
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "storage/AzureDataLakeStorage.h"
+#include "AzureStorageProcessorBase.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class AzureDataLakeStorageProcessorBase : public AzureStorageProcessorBase {
+ public:
+ // Supported Properties
+ EXTENSIONAPI static const core::Property FilesystemName;
+ EXTENSIONAPI static const core::Property DirectoryName;
+ EXTENSIONAPI static const core::Property FileName;
+
+ explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger)
+ : AzureStorageProcessorBase(name, uuid, logger) {
+ }
+
+ ~AzureDataLakeStorageProcessorBase() override = default;
+
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ protected:
+ explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger,
+ std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureStorageProcessorBase(name, uuid, logger),
+ azure_data_lake_storage_(std::move(data_lake_storage_client)) {
+ }
+
+ bool setCommonParameters(storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+
+ storage::AzureStorageCredentials credentials_;
+ storage::AzureDataLakeStorage azure_data_lake_storage_;
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.cpp b/extensions/azure/processors/AzureStorageProcessorBase.cpp
index 63b230e..a5a2426 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureStorageProcessorBase.cpp
@@ -33,13 +33,13 @@ const core::Property AzureStorageProcessorBase::AzureStorageCredentialsService(
->build());
std::tuple<AzureStorageProcessorBase::GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> AzureStorageProcessorBase::getCredentialsFromControllerService(
- const std::shared_ptr<core::ProcessContext> &context) const {
+ core::ProcessContext &context) const {
std::string service_name;
- if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
+ if (!context.getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_EMPTY, std::nullopt);
}
- std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name);
+ std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name);
if (nullptr == service) {
logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name);
return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt);
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h
index f87e827..a85ae7b 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureStorageProcessorBase.h
@@ -49,7 +49,7 @@ class AzureStorageProcessorBase : public core::Processor {
CONTROLLER_NAME_INVALID
};
- std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
+ std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(core::ProcessContext &context) const;
std::shared_ptr<core::logging::Logger> logger_;
};
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..6146a31
--- /dev/null
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
@@ -0,0 +1,85 @@
+/**
+ * @file DeleteAzureDataLakeStorage.cpp
+ * DeleteAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Relationship DeleteAzureDataLakeStorage::Success("success", "If file deletion from Azure storage succeeds the flowfile is transferred to this relationship");
+const core::Relationship DeleteAzureDataLakeStorage::Failure("failure", "If file deletion from Azure storage fails the flowfile is transferred to this relationship");
+
+void DeleteAzureDataLakeStorage::initialize() {
+ // Set the supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ FileName
+ });
+
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+std::optional<storage::DeleteAzureDataLakeStorageParameters> DeleteAzureDataLakeStorage::buildDeleteParameters(
+ core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ storage::DeleteAzureDataLakeStorageParameters params;
+ if (!setCommonParameters(params, context, flow_file)) {
+ return std::nullopt;
+ }
+
+ return params;
+}
+
+void DeleteAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session);
+ logger_->log_trace("DeleteAzureDataLakeStorage onTrigger");
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ const auto params = buildDeleteParameters(*context, flow_file);
+ if (!params) {
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ auto result = azure_data_lake_storage_.deleteFile(*params);
+ if (!result) {
+ logger_->log_error("Failed to delete file '%s' to Azure Data Lake storage", params->filename);
+ session->transfer(flow_file, Failure);
+ } else {
+ logger_->log_debug("Successfully deleted file '%s' of filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name);
+ session->transfer(flow_file, Success);
+ }
+}
+
+REGISTER_RESOURCE(DeleteAzureDataLakeStorage, "Deletes the provided file from Azure Data Lake Storage");
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
new file mode 100644
index 0000000..cb26a2f
--- /dev/null
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -0,0 +1,67 @@
+/**
+ * @file DeleteAzureDataLakeStorage.h
+ * DeleteAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+template<typename AzureDataLakeStorageProcessorBase>
+class AzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+ // Supported Relationships
+ static const core::Relationship Failure;
+ static const core::Relationship Success;
+
+ explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
+ }
+
+ ~DeleteAzureDataLakeStorage() override = default;
+
+ void initialize() override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ friend class ::AzureDataLakeStorageTestsFixture<DeleteAzureDataLakeStorage>;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
+ explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+ }
+
+ std::optional<storage::DeleteAzureDataLakeStorageParameters> buildDeleteParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index e5f66fd..7913ca8 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -201,7 +201,7 @@ std::optional<storage::PutAzureBlobStorageParameters> PutAzureBlobStorage::build
std::optional<storage::AzureStorageCredentials> PutAzureBlobStorage::getCredentials(
const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::FlowFile> &flow_file) const {
- auto [result, controller_service_creds] = getCredentialsFromControllerService(context);
+ auto [result, controller_service_creds] = getCredentialsFromControllerService(*context);
if (controller_service_creds) {
if (controller_service_creds->isValid()) {
logger_->log_debug("Azure credentials read from credentials controller service!");
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index 5620737..d197d2f 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -20,30 +20,14 @@
#include "PutAzureDataLakeStorage.h"
+#include <vector>
+
#include "utils/ProcessorConfigUtils.h"
#include "utils/gsl.h"
-#include "controllerservices/AzureStorageCredentialsService.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::azure::processors {
-const core::Property PutAzureDataLakeStorage::FilesystemName(
- core::PropertyBuilder::createProperty("Filesystem Name")
- ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.")
- ->supportsExpressionLanguage(true)
- ->isRequired(true)
- ->build());
-const core::Property PutAzureDataLakeStorage::DirectoryName(
- core::PropertyBuilder::createProperty("Directory Name")
- ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. "
- "If left empty it designates the root directory. The directory will be created if not already existing.")
- ->supportsExpressionLanguage(true)
- ->build());
-const core::Property PutAzureDataLakeStorage::FileName(
- core::PropertyBuilder::createProperty("File Name")
- ->withDescription("The filename to be uploaded. If left empty the filename attribute will be used by default.")
- ->supportsExpressionLanguage(true)
- ->build());
const core::Property PutAzureDataLakeStorage::ConflictResolutionStrategy(
core::PropertyBuilder::createProperty("Conflict Resolution Strategy")
->withDescription("Indicates what should happen when a file with the same name already exists in the output directory.")
@@ -71,46 +55,27 @@ void PutAzureDataLakeStorage::initialize() {
});
}
-void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
- std::optional<storage::AzureStorageCredentials> credentials;
- std::tie(std::ignore, credentials) = getCredentialsFromControllerService(context);
- if (!credentials) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid");
- }
-
- if (!credentials->isValid()) {
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid");
- }
-
- credentials_ = *credentials;
+void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ gsl_Expects(context && sessionFactory);
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
conflict_resolution_strategy_ = FileExistsResolutionStrategy::parse(
utils::parsePropertyWithAllowableValuesOrThrow(*context, ConflictResolutionStrategy.getName(), FileExistsResolutionStrategy::values()).c_str());
}
std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters(
- const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
storage::PutAzureDataLakeStorageParameters params;
- params.credentials = credentials_;
- params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE;
-
- if (!context->getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) {
- logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name);
- return std::nullopt;
- }
-
- context->getProperty(DirectoryName, params.directory_name, flow_file);
-
- context->getProperty(FileName, params.filename, flow_file);
- if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) {
- logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!");
+ if (!setCommonParameters(params, context, flow_file)) {
return std::nullopt;
}
+ params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE;
return params;
}
void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session);
logger_->log_trace("PutAzureDataLakeStorage onTrigger");
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
@@ -118,7 +83,7 @@ void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessConte
return;
}
- const auto params = buildUploadParameters(context, flow_file);
+ const auto params = buildUploadParameters(*context, flow_file);
if (!params) {
session->transfer(flow_file, Failure);
return;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 67ae6c9..a268711 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -20,30 +20,23 @@
#pragma once
-#include <utility>
#include <string>
+#include <utility>
#include <memory>
-#include <optional>
-#include <vector>
-#include "core/Property.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "storage/AzureDataLakeStorage.h"
+#include "AzureDataLakeStorageProcessorBase.h"
+
#include "utils/Enum.h"
#include "utils/Export.h"
-#include "AzureStorageProcessorBase.h"
-class PutAzureDataLakeStorageTestsFixture;
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
+class PutAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
public:
// Supported Properties
- EXTENSIONAPI static const core::Property FilesystemName;
- EXTENSIONAPI static const core::Property DirectoryName;
- EXTENSIONAPI static const core::Property FileName;
EXTENSIONAPI static const core::Property ConflictResolutionStrategy;
// Supported Relationships
@@ -65,7 +58,7 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
private:
- friend class ::PutAzureDataLakeStorageTestsFixture;
+ friend class ::AzureDataLakeStorageTestsFixture<PutAzureDataLakeStorage>;
class ReadCallback : public InputStreamCallback {
public:
@@ -93,15 +86,12 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
}
explicit PutAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
- : AzureStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()),
- azure_data_lake_storage_(std::move(data_lake_storage_client)) {
+ : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
}
- std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+ std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
- storage::AzureStorageCredentials credentials_;
FileExistsResolutionStrategy conflict_resolution_strategy_;
- storage::AzureDataLakeStorage azure_data_lake_storage_;
};
} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 5562933..2be5292 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -52,4 +52,13 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL
}
}
+bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) {
+ try {
+ return data_lake_storage_client_->deleteFile(params);
+ } catch (const std::exception& ex) {
+ logger_->log_error("An exception occurred while deleting '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
+ return false;
+ }
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h
index a1ec8f6..8dd9b8a 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -47,6 +47,7 @@ class AzureDataLakeStorage {
explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client = nullptr);
storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer);
+ bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
private:
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e56b967..7a54a69 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -52,7 +52,7 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia
credentials_ = credentials;
}
-Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const PutAzureDataLakeStorageParameters& params) {
+Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) {
resetClientIfNeeded(params.credentials, params.file_system_name);
auto directory_client = client_->GetDirectoryClient(params.directory_name);
@@ -74,4 +74,10 @@ std::string AzureDataLakeStorageClient::uploadFile(const PutAzureDataLakeStorage
return file_client.GetUrl();
}
+bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStorageParameters& params) {
+ auto file_client = getFileClient(params);
+ auto result = file_client.Delete();
+ return result.Value.Deleted;
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 3f13fe6..e7d933d 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -49,9 +49,16 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
*/
std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override;
+ /**
+ * Deletes a file on the Azure Data Lake Storage
+ * @param params Parameters required for connecting and file access on Azure
+ * @return True if file was deleted, false otherwise
+ */
+ bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override;
+
private:
void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name);
- Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const PutAzureDataLakeStorageParameters& params);
+ Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params);
AzureStorageCredentials credentials_;
std::string file_system_name_;
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index b0f470d..4e97d72 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -28,18 +28,24 @@
namespace org::apache::nifi::minifi::azure::storage {
-struct PutAzureDataLakeStorageParameters {
+struct AzureDataLakeStorageParameters {
AzureStorageCredentials credentials;
std::string file_system_name;
std::string directory_name;
std::string filename;
+};
+
+struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
bool replace_file = false;
};
+using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
+
class DataLakeStorageClient {
public:
virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
+ virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0;
virtual ~DataLakeStorageClient() = default;
};
diff --git a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
index 5dd99c9..16e2b47 100644
--- a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
@@ -58,8 +58,7 @@ TEST_CASE("GenerateFlowFileTest", "[generateflowfiletest]") {
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
- std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
- file_contents.push_back(file_content);
+ file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
return true;
};
@@ -188,8 +187,7 @@ TEST_CASE("GenerateFlowFileCustomTextTest", "[generateflowfiletest]") {
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
- std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
- file_contents.push_back(file_content);
+ file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
return true;
};
@@ -226,8 +224,7 @@ TEST_CASE("GenerateFlowFileCustomTextEmptyTest", "[generateflowfiletest]") {
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
- std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
- file_contents.push_back(file_content);
+ file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
return true;
};
diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
new file mode 100644
index 0000000..6878e71
--- /dev/null
+++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <vector>
+#include <memory>
+#include <string>
+
+#include "MockDataLakeStorageClient.h"
+#include "../TestBase.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/Processor.h"
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "utils/file/FileUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+const std::string FILESYSTEM_NAME = "testfilesystem";
+const std::string DIRECTORY_NAME = "testdir";
+const std::string FILE_NAME = "testfile.txt";
+const std::string CONNECTION_STRING = "test-connectionstring";
+const std::string TEST_DATA = "data123";
+const std::string GETFILE_FILE_NAME = "input_data.log";
+
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture {
+ public:
+ AzureDataLakeStorageTestsFixture() {
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setTrace<minifi::processors::GetFile>();
+ LogTestController::getInstance().setTrace<minifi::processors::PutFile>();
+ LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<AzureDataLakeStorageProcessor>();
+
+ // Build MiNiFi processing graph
+ plan_ = test_controller_.createPlan();
+ auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
+ mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
+ azure_data_lake_storage_ = std::shared_ptr<AzureDataLakeStorageProcessor>(
+ new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client)));
+ auto input_dir = test_controller_.createTempDirectory();
+ utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA);
+
+ get_file_ = plan_->addProcessor("GetFile", "GetFile");
+ plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir);
+ plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false");
+
+ update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true);
+ plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
+ auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
+ logattribute->setAutoTerminatedRelationships({{"success", "d"}});
+
+ putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
+ plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_);
+ putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+ output_dir_ = test_controller_.createTempDirectory();
+ plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
+
+ azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ setDefaultProperties();
+ }
+
+ std::vector<std::string> getFailedFlowFileContents() {
+ std::vector<std::string> file_contents;
+
+ auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
+ std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+ file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
+ return file_contents;
+ }
+
+ void setDefaultProperties() {
+ plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService");
+ plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true);
+ plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::FilesystemName.getName(), "${test.filesystemname}");
+ plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true);
+ plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::DirectoryName.getName(), "${test.directoryname}");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+ }
+
+ virtual ~AzureDataLakeStorageTestsFixture() {
+ LogTestController::getInstance().reset();
+ }
+
+ protected:
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> plan_;
+ MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
+ std::shared_ptr<core::Processor> azure_data_lake_storage_;
+ std::shared_ptr<core::Processor> get_file_;
+ std::shared_ptr<core::Processor> update_attribute_;
+ std::shared_ptr<core::Processor> putfile_;
+ std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
+ std::string output_dir_;
+};
diff --git a/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..ea485f5
--- /dev/null
+++ b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageTestsFixture.h"
+#include "processors/DeleteAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace {
+
+using namespace std::chrono_literals;
+
+using DeleteAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::DeleteAzureDataLakeStorage>;
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::DeleteAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") {
+ setDefaultProperties();
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ CHECK(passed_params.credentials.buildConnectionString().empty());
+ CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
+ CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+ CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(update_attribute_, "test.filesystemname", "", true);
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto failed_flowfiles = getFailedFlowFileContents();
+ CHECK(failed_flowfiles.size() == 1);
+ CHECK(failed_flowfiles[0] == TEST_DATA);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file succeeds", "[azureDataLakeStorageDelete]") {
+ test_controller_.runSession(plan_, true);
+ REQUIRE(getFailedFlowFileContents().size() == 0);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.filename == GETFILE_FILE_NAME);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME));
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file fails", "[azureDataLakeStorageDelete]") {
+ mock_data_lake_storage_client_ptr_->setDeleteFailure(true);
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.filename == GETFILE_FILE_NAME);
+ CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms));
+ auto failed_flowfiles = getFailedFlowFileContents();
+ REQUIRE(failed_flowfiles.size() == 1);
+ REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete result is false", "[azureDataLakeStorageDelete]") {
+ mock_data_lake_storage_client_ptr_->setDeleteResult(false);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.filename == GETFILE_FILE_NAME);
+ CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms));
+ auto failed_flowfiles = getFailedFlowFileContents();
+ REQUIRE(failed_flowfiles.size() == 1);
+ REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+} // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
new file mode 100644
index 0000000..eeac4dc
--- /dev/null
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <stdexcept>
+
+#include "storage/DataLakeStorageClient.h"
+
+class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient {
+ public:
+ const std::string PRIMARY_URI = "http://test-uri/file";
+
+ bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
+ if (file_creation_error_) {
+ throw std::runtime_error("error");
+ }
+ return create_file_;
+ }
+
+ std::string uploadFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override {
+ input_data_ = std::string(buffer.begin(), buffer.end());
+ put_params_ = params;
+
+ if (upload_fails_) {
+ throw std::runtime_error("error");
+ }
+
+ return RETURNED_PRIMARY_URI;
+ }
+
+ bool deleteFile(const org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters& params) override {
+ delete_params_ = params;
+
+ if (delete_fails_) {
+ throw std::runtime_error("error");
+ }
+
+ return delete_result_;
+ }
+
+ void setFileCreation(bool create_file) {
+ create_file_ = create_file;
+ }
+
+ void setFileCreationError(bool file_creation_error) {
+ file_creation_error_ = file_creation_error;
+ }
+
+ void setUploadFailure(bool upload_fails) {
+ upload_fails_ = upload_fails;
+ }
+
+ void setDeleteFailure(bool delete_fails) {
+ delete_fails_ = delete_fails;
+ }
+
+ void setDeleteResult(bool delete_result) {
+ delete_result_ = delete_result;
+ }
+
+ org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const {
+ return put_params_;
+ }
+
+ org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters getPassedDeleteParams() const {
+ return delete_params_;
+ }
+
+ private:
+ const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
+ bool create_file_ = true;
+ bool file_creation_error_ = false;
+ bool upload_fails_ = false;
+ bool delete_fails_ = false;
+ bool delete_result_ = true;
+ std::string input_data_;
+ org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_;
+ org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_;
+};
diff --git a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
index 816de5d..9c2b771 100644
--- a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
@@ -145,8 +145,7 @@ class PutAzureBlobStorageTestsFixture {
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
- std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
- file_contents.push_back(file_content);
+ file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
return true;
};
diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
index a0520d4..39463c4 100644
--- a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
@@ -16,156 +16,18 @@
* limitations under the License.
*/
-#include "../TestBase.h"
-#include "utils/IntegrationTestUtils.h"
-#include "utils/TestUtils.h"
-#include "core/Processor.h"
+#include "AzureDataLakeStorageTestsFixture.h"
#include "processors/PutAzureDataLakeStorage.h"
-#include "processors/GetFile.h"
-#include "processors/PutFile.h"
-#include "processors/LogAttribute.h"
-#include "processors/UpdateAttribute.h"
-#include "storage/DataLakeStorageClient.h"
-#include "utils/file/FileUtils.h"
#include "controllerservices/AzureStorageCredentialsService.h"
-using namespace std::chrono_literals;
-
-const std::string FILESYSTEM_NAME = "testfilesystem";
-const std::string DIRECTORY_NAME = "testdir";
-const std::string FILE_NAME = "testfile.txt";
-const std::string CONNECTION_STRING = "test-connectionstring";
-const std::string TEST_DATA = "data123";
-const std::string GETFILE_FILE_NAME = "input_data.log";
-
-class MockDataLakeStorageClient : public minifi::azure::storage::DataLakeStorageClient {
- public:
- const std::string PRIMARY_URI = "http://test-uri/file";
-
- bool createFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
- if (file_creation_error_) {
- throw std::runtime_error("error");
- }
- return create_file_;
- }
-
- std::string uploadFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override {
- input_data_ = std::string(buffer.begin(), buffer.end());
- params_ = params;
-
- if (upload_fails_) {
- throw std::runtime_error("error");
- }
-
- return RETURNED_PRIMARY_URI;
- }
-
- void setFileCreation(bool create_file) {
- create_file_ = create_file;
- }
-
- void setFileCreationError(bool file_creation_error) {
- file_creation_error_ = file_creation_error;
- }
-
- void setUploadFailure(bool upload_fails) {
- upload_fails_ = upload_fails;
- }
-
- minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedParams() const {
- return params_;
- }
-
- private:
- const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
- bool create_file_ = true;
- bool file_creation_error_ = false;
- bool upload_fails_ = false;
- std::string input_data_;
- minifi::azure::storage::PutAzureDataLakeStorageParameters params_;
-};
+namespace {
-class PutAzureDataLakeStorageTestsFixture {
- public:
- PutAzureDataLakeStorageTestsFixture() {
- LogTestController::getInstance().setDebug<TestPlan>();
- LogTestController::getInstance().setDebug<minifi::core::Processor>();
- LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
- LogTestController::getInstance().setTrace<minifi::processors::GetFile>();
- LogTestController::getInstance().setTrace<minifi::processors::PutFile>();
- LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();
- LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<minifi::azure::processors::PutAzureDataLakeStorage>();
-
- // Build MiNiFi processing graph
- plan_ = test_controller_.createPlan();
- auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
- mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
- put_azure_data_lake_storage_ = std::shared_ptr<minifi::azure::processors::PutAzureDataLakeStorage>(
- new minifi::azure::processors::PutAzureDataLakeStorage("PutAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client)));
- auto input_dir = test_controller_.createTempDirectory();
- utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA);
-
- get_file_ = plan_->addProcessor("GetFile", "GetFile");
- plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir);
- plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false");
-
- update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true);
- plan_->addProcessor(put_azure_data_lake_storage_, "PutAzureDataLakeStorage", { {"success", "d"}, {"failure", "d"} }, true);
- auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
- logattribute->setAutoTerminatedRelationships({{"success", "d"}});
-
- putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
- plan_->addConnection(put_azure_data_lake_storage_, {"failure", "d"}, putfile_);
- putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
- output_dir_ = test_controller_.createTempDirectory();
- plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
-
- azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
- setDefaultProperties();
- }
-
- std::vector<std::string> getFailedFlowFileContents() {
- std::vector<std::string> file_contents;
-
- auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
- std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
- std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
- file_contents.push_back(file_content);
- return true;
- };
-
- utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
- return file_contents;
- }
-
- void setDefaultProperties() {
- plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService");
- plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true);
- plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::FilesystemName.getName(), "${test.filesystemname}");
- plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true);
- plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "${test.directoryname}");
- plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
- }
-
- virtual ~PutAzureDataLakeStorageTestsFixture() {
- LogTestController::getInstance().reset();
- }
+using namespace std::chrono_literals;
- protected:
- TestController test_controller_;
- std::shared_ptr<TestPlan> plan_;
- MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
- std::shared_ptr<core::Processor> put_azure_data_lake_storage_;
- std::shared_ptr<core::Processor> get_file_;
- std::shared_ptr<core::Processor> update_attribute_;
- std::shared_ptr<core::Processor> putfile_;
- std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
- std::string output_dir_;
-};
+using PutAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::PutAzureDataLakeStorage>;
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
- plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
REQUIRE(getFailedFlowFileContents().size() == 0);
}
@@ -176,7 +38,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
REQUIRE(getFailedFlowFileContents().size() == 0);
}
@@ -187,7 +49,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
REQUIRE(getFailedFlowFileContents().size() == 0);
}
@@ -198,7 +60,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString().empty());
CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
@@ -223,7 +85,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empt
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") {
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
@@ -263,7 +125,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to failure on 'f
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'ignore' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
- plan_->setProperty(put_azure_data_lake_storage_,
+ plan_->setProperty(azure_data_lake_storage_,
minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(),
toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::IGNORE_REQUEST));
mock_data_lake_storage_client_ptr_->setFileCreation(false);
@@ -275,12 +137,12 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'i
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
- plan_->setProperty(put_azure_data_lake_storage_,
+ plan_->setProperty(azure_data_lake_storage_,
minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(),
toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::REPLACE_FILE));
mock_data_lake_storage_client_ptr_->setFileCreation(false);
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
@@ -296,11 +158,13 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'repl
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") {
- plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "");
+ plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "");
test_controller_.runSession(plan_, true);
- auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+ auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.directory_name == "");
REQUIRE(getFailedFlowFileContents().size() == 0);
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n"));
}
+
+} // namespace