You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 13:20:58 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1665 Add ListAzureBlobStorage processor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit aa450cece5de046beabd110ccb50dd1009b43f9e
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 15:08:45 2022 +0200
MINIFICPP-1665 Add ListAzureBlobStorage processor
Closes #1266
Signed-off-by: Marton Szasz <sz...@apache.org>
---
PROCESSORS.md | 30 +++
README.md | 2 +-
.../integration/MiNiFi_integration_test_driver.py | 8 +
.../integration/features/azure_storage.feature | 16 ++
.../minifi/processors/ListAzureBlobStorage.py | 12 +
docker/test/integration/steps/steps.py | 5 +
.../processors/AzureBlobStorageProcessorBase.cpp | 11 -
.../processors/AzureBlobStorageProcessorBase.h | 1 -
.../AzureBlobStorageSingleBlobProcessorBase.cpp | 51 ++++
.../AzureBlobStorageSingleBlobProcessorBase.h | 56 ++++
.../azure/processors/DeleteAzureBlobStorage.cpp | 2 +-
.../azure/processors/DeleteAzureBlobStorage.h | 6 +-
.../azure/processors/FetchAzureBlobStorage.cpp | 2 +-
.../azure/processors/FetchAzureBlobStorage.h | 6 +-
.../azure/processors/ListAzureBlobStorage.cpp | 150 +++++++++++
...teAzureBlobStorage.h => ListAzureBlobStorage.h} | 46 ++--
.../azure/processors/ListAzureDataLakeStorage.cpp | 8 +-
.../azure/processors/ListAzureDataLakeStorage.h | 7 +-
.../azure/processors/PutAzureBlobStorage.cpp | 2 +-
extensions/azure/processors/PutAzureBlobStorage.h | 6 +-
extensions/azure/storage/AzureBlobStorage.cpp | 24 ++
extensions/azure/storage/AzureBlobStorage.h | 23 ++
.../azure/storage/AzureBlobStorageClient.cpp | 13 +-
extensions/azure/storage/AzureBlobStorageClient.h | 3 +-
extensions/azure/storage/BlobStorageClient.h | 16 +-
extensions/azure/storage/DataLakeStorageClient.h | 5 -
.../test/azure-tests/ListAzureBlobStorageTests.cpp | 287 +++++++++++++++++++++
.../azure-tests/ListAzureDataLakeStorageTests.cpp | 6 +-
libminifi/test/azure-tests/MockBlobStorage.h | 37 ++-
29 files changed, 772 insertions(+), 69 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index de725b8bc..d97f8752d 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -36,6 +36,7 @@
- [GetUSBCamera](#getusbcamera)
- [HashContent](#hashcontent)
- [InvokeHTTP](#invokehttp)
+- [ListAzureBlobStorage](#listazureblobstorage)
- [ListAzureDataLakeStorage](#listazuredatalakestorage)
- [ListenHTTP](#listenhttp)
- [ListenSyslog](#listensyslog)
@@ -946,6 +947,35 @@ In the list below, the names of required properties appear in bold. Any other pr
|failure|The original FlowFile will be routed on any type of connection failure, timeout or general exception. It will have new attributes detailing the request.|
+## ListAzureBlobStorage
+
+### Description
+
+Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Azure Storage Credentials Service|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|Common Storage Account Endpoint Suffix|||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).<br/>**Supports Expression Language: true**|
+|Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.<br/>**Supports Expression Language: true**|
+|**Container Name**|||Name of the Azure storage container. In case of PutAzureBlobStorage processor, container can be created if it does not exist.<br/>**Supports Expression Language: true**|
+|Listing Strategy|timestamps|none<br/>timestamps|Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.|
+|Prefix|||Search prefix for listing<br/>**Supports Expression Language: true**|
+|SAS Token|||Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.<br/>**Supports Expression Language: true**|
+|Storage Account Key|||The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.<br/>**Supports Expression Language: true**|
+|Storage Account Name|||The storage account name.<br/>**Supports Expression Language: true**|
+|**Use Managed Identity Credentials**|false||If true Managed Identity credentials will be used together with the Storage Account Name for authentication.|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|All FlowFiles that are received are routed to success|
+
+
## ListAzureDataLakeStorage
### Description
diff --git a/README.md b/README.md
index d4fe1e3f0..26af00cad 100644
--- a/README.md
+++ b/README.md
@@ -76,7 +76,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
|----------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
| Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) [...]
| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) [...]
-| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](#fetchazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)<br/>[ListAzureDataLakeStorage] [...]
+| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](#fetchazureblobstorage)<br/>[ListAzureBlobStorage](#listazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchaz [...]
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) [...]
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) [...]
| GPS | GetGPS [...]
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 91a39e89f..363ade89b 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -244,6 +244,14 @@ class MiNiFi_integration_test():
return
assert False
+ def check_minifi_log_does_not_contain(self, line, wait_time_seconds):
+ time.sleep(wait_time_seconds)
+ for container in self.cluster.containers.values():
+ if container.get_engine() == "minifi-cpp":
+ _, logs = self.cluster.get_app_log(container.get_name())
+ if logs is not None and 1 <= logs.decode("utf-8").count(line):
+ assert False
+
def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
diff --git a/docker/test/integration/features/azure_storage.feature b/docker/test/integration/features/azure_storage.feature
index d1190e6d2..efded934e 100644
--- a/docker/test/integration/features/azure_storage.feature
+++ b/docker/test/integration/features/azure_storage.feature
@@ -76,3 +76,19 @@ Feature: Sending data from MiNiFi-C++ to an Azure storage server
And test blob "test" with the content "#test_data$123$#" is created on Azure blob storage
Then a flowfile with the content "data$" is placed in the monitored directory in less than 60 seconds
+
+ Scenario: A MiNiFi instance can list a container on Azure blob storage
+ Given a ListAzureBlobStorage processor set up to communicate with an Azure blob storage
+ And the "Prefix" property of the ListAzureBlobStorage processor is set to "test"
+ And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
+ And the "success" relationship of the ListAzureBlobStorage processor is connected to the LogAttribute
+ And an Azure storage server is set up
+
+ When all instances start up
+ And test blob "test_1" with the content "data_1" is created on Azure blob storage
+ And test blob "test_2" with the content "data_2" is created on Azure blob storage
+ And test blob "other_test" with the content "data_3" is created on Azure blob storage
+
+ Then the Minifi logs contain the following message: "key:azure.blobname value:test_1" in less than 60 seconds
+ Then the Minifi logs contain the following message: "key:azure.blobname value:test_2" in less than 60 seconds
+ And the Minifi logs do not contain the following message: "key:azure.blobname value:other_test" after 0 seconds
diff --git a/docker/test/integration/minifi/processors/ListAzureBlobStorage.py b/docker/test/integration/minifi/processors/ListAzureBlobStorage.py
new file mode 100644
index 000000000..801241686
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListAzureBlobStorage.py
@@ -0,0 +1,12 @@
+from ..core.Processor import Processor
+
+
+class ListAzureBlobStorage(Processor):
+ def __init__(self):
+ super(ListAzureBlobStorage, self).__init__(
+ 'ListAzureBlobStorage',
+ properties={
+ 'Container Name': 'test-container',
+ 'Connection String': 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azure-storage-server:10000/devstoreaccount1;QueueEndpoint=http://azure-storage-server:10001/devstoreaccount1;'
+ },
+ auto_terminate=['success'])
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 96cfcf1a4..8a64acadf 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -746,6 +746,11 @@ def step_impl(context, log_message, count, seconds):
context.test.check_minifi_log_contents(log_message, 1, count)
+@then("the Minifi logs do not contain the following message: \"{log_message}\" after {seconds:d} seconds")
+def step_impl(context, log_message, seconds):
+ context.test.check_minifi_log_does_not_contain(log_message, seconds)
+
+
@then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
def step_impl(context, regex, duration):
context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
diff --git a/extensions/azure/processors/AzureBlobStorageProcessorBase.cpp b/extensions/azure/processors/AzureBlobStorageProcessorBase.cpp
index bd266379f..28096e989 100644
--- a/extensions/azure/processors/AzureBlobStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureBlobStorageProcessorBase.cpp
@@ -57,11 +57,6 @@ const core::Property AzureBlobStorageProcessorBase::ConnectionString(
->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.")
->supportsExpressionLanguage(true)
->build());
-const core::Property AzureBlobStorageProcessorBase::Blob(
- core::PropertyBuilder::createProperty("Blob")
- ->withDescription("The filename of the blob. If left empty the filename attribute will be used by default.")
- ->supportsExpressionLanguage(true)
- ->build());
const core::Property AzureBlobStorageProcessorBase::UseManagedIdentityCredentials(
core::PropertyBuilder::createProperty("Use Managed Identity Credentials")
->withDescription("If true Managed Identity credentials will be used together with the Storage Account Name for authentication.")
@@ -150,12 +145,6 @@ bool AzureBlobStorageProcessorBase::setCommonStorageParameters(
return false;
}
- context.getProperty(Blob, params.blob_name, flow_file);
- if (params.blob_name.empty() && (!flow_file->getAttribute("filename", params.blob_name) || params.blob_name.empty())) {
- logger_->log_error("Blob is not set and default 'filename' attribute could not be found!");
- return false;
- }
-
return true;
}
diff --git a/extensions/azure/processors/AzureBlobStorageProcessorBase.h b/extensions/azure/processors/AzureBlobStorageProcessorBase.h
index c953912a0..3aea33c3c 100644
--- a/extensions/azure/processors/AzureBlobStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureBlobStorageProcessorBase.h
@@ -42,7 +42,6 @@ class AzureBlobStorageProcessorBase : public AzureStorageProcessorBase {
EXTENSIONAPI static const core::Property SASToken;
EXTENSIONAPI static const core::Property CommonStorageAccountEndpointSuffix;
EXTENSIONAPI static const core::Property ConnectionString;
- EXTENSIONAPI static const core::Property Blob;
EXTENSIONAPI static const core::Property UseManagedIdentityCredentials;
explicit AzureBlobStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger>& logger)
diff --git a/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.cpp b/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.cpp
new file mode 100644
index 000000000..a7413ed53
--- /dev/null
+++ b/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.cpp
@@ -0,0 +1,51 @@
+/**
+ * @file AzureBlobStorageSingleBlobProcessorBase.cpp
+ * AzureBlobStorageSingleBlobProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureBlobStorageSingleBlobProcessorBase.h"
+
+#include "core/ProcessContext.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property AzureBlobStorageSingleBlobProcessorBase::Blob(
+ core::PropertyBuilder::createProperty("Blob")
+ ->withDescription("The filename of the blob. If left empty the filename attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+
+bool AzureBlobStorageSingleBlobProcessorBase::setBlobOperationParameters(
+ storage::AzureBlobStorageBlobOperationParameters& params,
+ core::ProcessContext &context,
+ const std::shared_ptr<core::FlowFile> &flow_file) {
+ if (!setCommonStorageParameters(params, context, flow_file)) {
+ return false;
+ }
+
+ context.getProperty(Blob, params.blob_name, flow_file);
+ if (params.blob_name.empty() && (!flow_file->getAttribute("filename", params.blob_name) || params.blob_name.empty())) {
+ logger_->log_error("Blob is not set and default 'filename' attribute could not be found!");
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.h b/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.h
new file mode 100644
index 000000000..1b948e75f
--- /dev/null
+++ b/extensions/azure/processors/AzureBlobStorageSingleBlobProcessorBase.h
@@ -0,0 +1,56 @@
+/**
+ * @file AzureBlobStorageSingleBlobProcessorBase.h
+ * AzureBlobStorageSingleBlobProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+
+#include "AzureBlobStorageProcessorBase.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class AzureBlobStorageSingleBlobProcessorBase : public AzureBlobStorageProcessorBase {
+ public:
+ // Supported Properties
+ EXTENSIONAPI static const core::Property Blob;
+
+ explicit AzureBlobStorageSingleBlobProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger>& logger)
+ : AzureBlobStorageSingleBlobProcessorBase(name, uuid, logger, nullptr) {
+ }
+
+ protected:
+ explicit AzureBlobStorageSingleBlobProcessorBase(
+ const std::string& name,
+ const minifi::utils::Identifier& uuid,
+ const std::shared_ptr<core::logging::Logger>& logger,
+ std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
+ : AzureBlobStorageProcessorBase(name, uuid, logger, std::move(blob_storage_client)) {
+ }
+
+ bool setBlobOperationParameters(
+ storage::AzureBlobStorageBlobOperationParameters& params,
+ core::ProcessContext &context,
+ const std::shared_ptr<core::FlowFile> &flow_file);
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.cpp b/extensions/azure/processors/DeleteAzureBlobStorage.cpp
index 590f93066..47453b84b 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.cpp
+++ b/extensions/azure/processors/DeleteAzureBlobStorage.cpp
@@ -68,7 +68,7 @@ void DeleteAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessConte
std::optional<storage::DeleteAzureBlobStorageParameters> DeleteAzureBlobStorage::buildDeleteAzureBlobStorageParameters(
core::ProcessContext &context, const std::shared_ptr<core::FlowFile> &flow_file) {
storage::DeleteAzureBlobStorageParameters params;
- if (!setCommonStorageParameters(params, context, flow_file)) {
+ if (!setBlobOperationParameters(params, context, flow_file)) {
return std::nullopt;
}
params.optional_deletion = optional_deletion_;
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.h b/extensions/azure/processors/DeleteAzureBlobStorage.h
index f2f62bcc8..526fd11fe 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.h
+++ b/extensions/azure/processors/DeleteAzureBlobStorage.h
@@ -27,7 +27,7 @@
#include <vector>
#include "core/Property.h"
-#include "AzureBlobStorageProcessorBase.h"
+#include "AzureBlobStorageSingleBlobProcessorBase.h"
#include "core/logging/LoggerConfiguration.h"
template<typename T>
@@ -35,7 +35,7 @@ class AzureBlobStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class DeleteAzureBlobStorage final : public AzureBlobStorageProcessorBase {
+class DeleteAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorBase {
public:
// Supported Properties
static const core::Property DeleteSnapshotsOption;
@@ -64,7 +64,7 @@ class DeleteAzureBlobStorage final : public AzureBlobStorageProcessorBase {
}
explicit DeleteAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
- : AzureBlobStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
+ : AzureBlobStorageSingleBlobProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
}
std::optional<storage::DeleteAzureBlobStorageParameters> buildDeleteAzureBlobStorageParameters(
diff --git a/extensions/azure/processors/FetchAzureBlobStorage.cpp b/extensions/azure/processors/FetchAzureBlobStorage.cpp
index 5359ce599..c0aded55e 100644
--- a/extensions/azure/processors/FetchAzureBlobStorage.cpp
+++ b/extensions/azure/processors/FetchAzureBlobStorage.cpp
@@ -65,7 +65,7 @@ void FetchAzureBlobStorage::initialize() {
std::optional<storage::FetchAzureBlobStorageParameters> FetchAzureBlobStorage::buildFetchAzureBlobStorageParameters(
core::ProcessContext &context, const std::shared_ptr<core::FlowFile> &flow_file) {
storage::FetchAzureBlobStorageParameters params;
- if (!setCommonStorageParameters(params, context, flow_file)) {
+ if (!setBlobOperationParameters(params, context, flow_file)) {
return std::nullopt;
}
diff --git a/extensions/azure/processors/FetchAzureBlobStorage.h b/extensions/azure/processors/FetchAzureBlobStorage.h
index 9c8515a6c..db295e771 100644
--- a/extensions/azure/processors/FetchAzureBlobStorage.h
+++ b/extensions/azure/processors/FetchAzureBlobStorage.h
@@ -27,7 +27,7 @@
#include <vector>
#include "core/Property.h"
-#include "AzureBlobStorageProcessorBase.h"
+#include "AzureBlobStorageSingleBlobProcessorBase.h"
#include "core/logging/LoggerConfiguration.h"
template<typename T>
@@ -35,7 +35,7 @@ class AzureBlobStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class FetchAzureBlobStorage final : public AzureBlobStorageProcessorBase {
+class FetchAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorBase {
public:
EXTENSIONAPI static const core::Property RangeStart;
EXTENSIONAPI static const core::Property RangeLength;
@@ -58,7 +58,7 @@ class FetchAzureBlobStorage final : public AzureBlobStorageProcessorBase {
}
explicit FetchAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
- : AzureBlobStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
+ : AzureBlobStorageSingleBlobProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
}
std::optional<storage::FetchAzureBlobStorageParameters> buildFetchAzureBlobStorageParameters(
diff --git a/extensions/azure/processors/ListAzureBlobStorage.cpp b/extensions/azure/processors/ListAzureBlobStorage.cpp
new file mode 100644
index 000000000..dd9940525
--- /dev/null
+++ b/extensions/azure/processors/ListAzureBlobStorage.cpp
@@ -0,0 +1,150 @@
+/**
+ * @file ListAzureBlobStorage.cpp
+ * ListAzureBlobStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureBlobStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+#include "core/ProcessSession.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureBlobStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to determine new/updated entities. "
+ "If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(toString(EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(EntityTracking::values())
+ ->build());
+
+const core::Property ListAzureBlobStorage::Prefix(
+ core::PropertyBuilder::createProperty("Prefix")
+ ->withDescription("Search prefix for listing")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship ListAzureBlobStorage::Success("success", "All FlowFiles that are received are routed to success");
+
+void ListAzureBlobStorage::initialize() {
+ // Set the supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ ContainerName,
+ StorageAccountName,
+ StorageAccountKey,
+ SASToken,
+ CommonStorageAccountEndpointSuffix,
+ ConnectionString,
+ UseManagedIdentityCredentials,
+ ListingStrategy,
+ Prefix
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ AzureBlobStorageProcessorBase::onSchedule(context, session_factory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ tracking_strategy_ = EntityTracking::parse(
+ utils::parsePropertyWithAllowableValuesOrThrow(*context, ListingStrategy.getName(), EntityTracking::values()).c_str());
+
+ auto params = buildListAzureBlobStorageParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for ListAzureBlobStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *params;
+}
+
+std::optional<storage::ListAzureBlobStorageParameters> ListAzureBlobStorage::buildListAzureBlobStorageParameters(core::ProcessContext &context) {
+ storage::ListAzureBlobStorageParameters params;
+ if (!setCommonStorageParameters(params, context, nullptr)) {
+ return std::nullopt;
+ }
+
+ context.getProperty(Prefix, params.prefix, nullptr);
+
+ return params;
+}
+
+std::shared_ptr<core::FlowFile> ListAzureBlobStorage::createNewFlowFile(core::ProcessSession &session, const storage::ListContainerResultElement &element) {
+ auto flow_file = session.create();
+ session.putAttribute(flow_file, "azure.container", list_parameters_.container_name);
+ session.putAttribute(flow_file, "azure.blobname", element.blob_name);
+ session.putAttribute(flow_file, "azure.primaryUri", element.primary_uri);
+ session.putAttribute(flow_file, "azure.etag", element.etag);
+ session.putAttribute(flow_file, "azure.length", std::to_string(element.length));
+ session.putAttribute(flow_file, "azure.timestamp", std::to_string(element.last_modified.time_since_epoch() / std::chrono::milliseconds(1)));
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::MIME_TYPE, element.mime_type);
+ session.putAttribute(flow_file, "lang", element.language);
+ session.putAttribute(flow_file, "azure.blobtype", element.blob_type);
+ session.transfer(flow_file, Success);
+ return flow_file;
+}
+
+void ListAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session);
+ logger_->log_trace("ListAzureBlobStorage onTrigger");
+
+ auto list_result = azure_blob_storage_.listContainer(list_parameters_);
+ if (!list_result || list_result->empty()) {
+ context->yield();
+ return;
+ }
+
+ auto stored_listing_state = state_manager_->getCurrentState();
+ auto latest_listing_state = stored_listing_state;
+ std::size_t files_transferred = 0;
+
+ for (const auto& element : *list_result) {
+ if (tracking_strategy_ == EntityTracking::TIMESTAMPS && stored_listing_state.wasObjectListedAlready(element)) {
+ continue;
+ }
+
+ auto flow_file = createNewFlowFile(*session, element);
+ session->transfer(flow_file, Success);
+ ++files_transferred;
+ latest_listing_state.updateState(element);
+ }
+
+ state_manager_->storeState(latest_listing_state);
+
+ logger_->log_debug("ListAzureBlobStorage transferred %zu flow files", files_transferred);
+
+ if (files_transferred == 0) {
+ logger_->log_debug("No new Azure Storage blobs were found in container '%s'", list_parameters_.container_name);
+ context->yield();
+ return;
+ }
+}
+
+REGISTER_RESOURCE(ListAzureBlobStorage, "Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage.");
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.h b/extensions/azure/processors/ListAzureBlobStorage.h
similarity index 57%
copy from extensions/azure/processors/DeleteAzureBlobStorage.h
copy to extensions/azure/processors/ListAzureBlobStorage.h
index f2f62bcc8..ab1b07e3e 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.h
+++ b/extensions/azure/processors/ListAzureBlobStorage.h
@@ -1,6 +1,6 @@
/**
- * @file DeleteAzureBlobStorage.h
- * DeleteAzureBlobStorage class declaration
+ * @file ListAzureBlobStorage.h
+ * ListAzureBlobStorage class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -30,22 +30,28 @@
#include "AzureBlobStorageProcessorBase.h"
#include "core/logging/LoggerConfiguration.h"
-template<typename T>
-class AzureBlobStorageTestsFixture;
-
namespace org::apache::nifi::minifi::azure::processors {
-class DeleteAzureBlobStorage final : public AzureBlobStorageProcessorBase {
+class ListAzureBlobStorage final : public AzureBlobStorageProcessorBase {
public:
+ SMART_ENUM(EntityTracking,
+ (NONE, "none"),
+ (TIMESTAMPS, "timestamps")
+ )
+
// Supported Properties
- static const core::Property DeleteSnapshotsOption;
+ EXTENSIONAPI static const core::Property ListingStrategy;
+ EXTENSIONAPI static const core::Property Prefix;
// Supported Relationships
- static const core::Relationship Failure;
static const core::Relationship Success;
- explicit DeleteAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
- : DeleteAzureBlobStorage(name, uuid, nullptr) {
+ explicit ListAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : ListAzureBlobStorage(name, nullptr, uuid) {
+ }
+
+ explicit ListAzureBlobStorage(const std::string& name, std::unique_ptr<storage::BlobStorageClient> blob_storage_client, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureBlobStorageProcessorBase(name, uuid, core::logging::LoggerFactory<ListAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
}
void initialize() override;
@@ -53,24 +59,16 @@ class DeleteAzureBlobStorage final : public AzureBlobStorageProcessorBase {
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
private:
- friend class ::AzureBlobStorageTestsFixture<DeleteAzureBlobStorage>;
-
core::annotation::Input getInputRequirement() const override {
- return core::annotation::Input::INPUT_REQUIRED;
- }
-
- bool isSingleThreaded() const override {
- return true;
- }
-
- explicit DeleteAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
- : AzureBlobStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
+ return core::annotation::Input::INPUT_FORBIDDEN;
}
- std::optional<storage::DeleteAzureBlobStorageParameters> buildDeleteAzureBlobStorageParameters(
- core::ProcessContext &context, const std::shared_ptr<core::FlowFile> &flow_file);
+ std::optional<storage::ListAzureBlobStorageParameters> buildListAzureBlobStorageParameters(core::ProcessContext &context);
+ std::shared_ptr<core::FlowFile> createNewFlowFile(core::ProcessSession &session, const storage::ListContainerResultElement &element);
- storage::OptionalDeletion optional_deletion_;
+ storage::ListAzureBlobStorageParameters list_parameters_;
+ EntityTracking tracking_strategy_ = EntityTracking::TIMESTAMPS;
+ std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
};
} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.cpp b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
index 685a93f63..e284e9ca0 100644
--- a/extensions/azure/processors/ListAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
@@ -47,8 +47,8 @@ const core::Property ListAzureDataLakeStorage::ListingStrategy(
core::PropertyBuilder::createProperty("Listing Strategy")
->withDescription("Specify how to determine new/updated entities. If 'timestamps' is selected it tracks the latest timestamp of listed entity to "
"determine new/updated entities. If 'none' is selected it lists an entity without any tracking, the same entity will be listed each time on executing this processor.")
- ->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
- ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->withDefaultValue<std::string>(toString(EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(EntityTracking::values())
->build());
const core::Relationship ListAzureDataLakeStorage::Success("success", "All FlowFiles that are received are routed to success");
@@ -98,7 +98,7 @@ void ListAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessCon
}
list_parameters_ = *std::move(params);
- tracking_strategy_ = utils::parseEnumProperty<storage::EntityTracking>(*context, ListingStrategy);
+ tracking_strategy_ = utils::parseEnumProperty<EntityTracking>(*context, ListingStrategy);
}
std::optional<storage::ListAzureDataLakeStorageParameters> ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
@@ -147,7 +147,7 @@ void ListAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessCont
std::size_t files_transferred = 0;
for (const auto& element : *list_result) {
- if (tracking_strategy_ == storage::EntityTracking::TIMESTAMPS && stored_listing_state.wasObjectListedAlready(element)) {
+ if (tracking_strategy_ == EntityTracking::TIMESTAMPS && stored_listing_state.wasObjectListedAlready(element)) {
continue;
}
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.h b/extensions/azure/processors/ListAzureDataLakeStorage.h
index c08bf952e..51bd17c78 100644
--- a/extensions/azure/processors/ListAzureDataLakeStorage.h
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.h
@@ -32,6 +32,11 @@ namespace org::apache::nifi::minifi::azure::processors {
class ListAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
public:
+ SMART_ENUM(EntityTracking,
+ (NONE, "none"),
+ (TIMESTAMPS, "timestamps")
+ )
+
EXTENSIONAPI static const core::Property RecurseSubdirectories;
EXTENSIONAPI static const core::Property FileFilter;
EXTENSIONAPI static const core::Property PathFilter;
@@ -62,7 +67,7 @@ class ListAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase
std::optional<storage::ListAzureDataLakeStorageParameters> buildListParameters(core::ProcessContext& context);
- storage::EntityTracking tracking_strategy_ = storage::EntityTracking::TIMESTAMPS;
+ EntityTracking tracking_strategy_ = EntityTracking::TIMESTAMPS;
storage::ListAzureDataLakeStorageParameters list_parameters_;
std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
};
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 5cb70767b..493fa93e3 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -70,7 +70,7 @@ std::optional<storage::PutAzureBlobStorageParameters> PutAzureBlobStorage::build
core::ProcessContext &context,
const std::shared_ptr<core::FlowFile> &flow_file) {
storage::PutAzureBlobStorageParameters params;
- if (!setCommonStorageParameters(params, context, flow_file)) {
+ if (!setBlobOperationParameters(params, context, flow_file)) {
return std::nullopt;
}
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h
index 1a25fbf7a..ccb731af3 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -28,7 +28,7 @@
#include "core/Property.h"
#include "core/logging/LoggerConfiguration.h"
-#include "AzureBlobStorageProcessorBase.h"
+#include "AzureBlobStorageSingleBlobProcessorBase.h"
#include "io/StreamPipe.h"
template<typename T>
@@ -36,7 +36,7 @@ class AzureBlobStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
+class PutAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorBase {
public:
// Supported Properties
static const core::Property CreateContainer;
@@ -88,7 +88,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
friend class ::AzureBlobStorageTestsFixture<PutAzureBlobStorage>;
explicit PutAzureBlobStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::BlobStorageClient> blob_storage_client)
- : AzureBlobStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
+ : AzureBlobStorageSingleBlobProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureBlobStorage>::getLogger(), std::move(blob_storage_client)) {
}
std::optional<storage::PutAzureBlobStorageParameters> buildPutAzureBlobStorageParameters(core::ProcessContext &context, const std::shared_ptr<core::FlowFile> &flow_file);
diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp
index 14e26b959..d881ecdb1 100644
--- a/extensions/azure/storage/AzureBlobStorage.cpp
+++ b/extensions/azure/storage/AzureBlobStorage.cpp
@@ -85,4 +85,28 @@ std::optional<uint64_t> AzureBlobStorage::fetchBlob(const FetchAzureBlobStorageP
}
}
+std::optional<ListContainerResult> AzureBlobStorage::listContainer(const ListAzureBlobStorageParameters& params) {
+ try {
+ ListContainerResult result;
+ auto blobs = blob_storage_client_->listContainer(params);
+ auto primary_uri = blob_storage_client_->getUrl(params);
+ for (const auto& blob : blobs) {
+ ListContainerResultElement element;
+ element.blob_name = blob.Name;
+ element.primary_uri = primary_uri;
+ element.etag = blob.Details.ETag.ToString();
+ element.length = blob.BlobSize;
+ element.last_modified = static_cast<std::chrono::system_clock::time_point>(blob.Details.LastModified);
+ element.mime_type = blob.Details.HttpHeaders.ContentType;
+ element.language = blob.Details.HttpHeaders.ContentLanguage;
+ element.blob_type = blob.BlobType.ToString();
+ result.push_back(element);
+ }
+ return result;
+ } catch (const std::exception& ex) {
+ logger_->log_error("An exception occurred while listing container: %s", ex.what());
+ return std::nullopt;
+ }
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureBlobStorage.h b/extensions/azure/storage/AzureBlobStorage.h
index 0750b822c..5b1acb804 100644
--- a/extensions/azure/storage/AzureBlobStorage.h
+++ b/extensions/azure/storage/AzureBlobStorage.h
@@ -29,6 +29,7 @@
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/gsl.h"
+#include "utils/ListingStateManager.h"
namespace org::apache::nifi::minifi::azure::storage {
@@ -38,6 +39,27 @@ struct UploadBlobResult {
std::string timestamp;
};
+struct ListContainerResultElement : public minifi::utils::ListedObject {
+ std::string blob_name;
+ std::string primary_uri;
+ std::string etag;
+ int64_t length = 0;
+ std::chrono::time_point<std::chrono::system_clock> last_modified;
+ std::string mime_type;
+ std::string language;
+ std::string blob_type;
+
+ std::chrono::time_point<std::chrono::system_clock> getLastModified() const override {
+ return last_modified;
+ }
+
+ std::string getKey() const override {
+ return blob_name;
+ }
+};
+
+using ListContainerResult = std::vector<ListContainerResultElement>;
+
class AzureBlobStorage {
public:
explicit AzureBlobStorage(std::unique_ptr<BlobStorageClient> blob_storage_client = nullptr);
@@ -45,6 +67,7 @@ class AzureBlobStorage {
std::optional<UploadBlobResult> uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer);
bool deleteBlob(const DeleteAzureBlobStorageParameters& params);
std::optional<uint64_t> fetchBlob(const FetchAzureBlobStorageParameters& params, io::BaseStream& stream);
+ std::optional<ListContainerResult> listContainer(const ListAzureBlobStorageParameters& params);
private:
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureBlobStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureBlobStorageClient.cpp b/extensions/azure/storage/AzureBlobStorageClient.cpp
index 026e09c82..99ade0805 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.cpp
+++ b/extensions/azure/storage/AzureBlobStorageClient.cpp
@@ -87,7 +87,7 @@ Azure::Storage::Blobs::Models::UploadBlockBlobResult AzureBlobStorageClient::upl
return blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size()).Value;
}
-std::string AzureBlobStorageClient::getUrl(const PutAzureBlobStorageParameters& params) {
+std::string AzureBlobStorageClient::getUrl(const AzureBlobStorageParameters& params) {
resetClientIfNeeded(params.credentials, params.container_name);
return container_client_->GetUrl();
}
@@ -123,4 +123,15 @@ std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAz
return std::make_unique<AzureBlobStorageInputStream>(std::move(result.Value));
}
+std::vector<Azure::Storage::Blobs::Models::BlobItem> AzureBlobStorageClient::listContainer(const ListAzureBlobStorageParameters& params) {
+ std::vector<Azure::Storage::Blobs::Models::BlobItem> result;
+ resetClientIfNeeded(params.credentials, params.container_name);
+ Azure::Storage::Blobs::ListBlobsOptions options;
+ options.Prefix = params.prefix;
+ for (auto page_result = container_client_->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
+ result.insert(result.end(), page_result.Blobs.begin(), page_result.Blobs.end());
+ }
+ return result;
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureBlobStorageClient.h b/extensions/azure/storage/AzureBlobStorageClient.h
index bde1ea3eb..3f9378b21 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.h
+++ b/extensions/azure/storage/AzureBlobStorageClient.h
@@ -37,9 +37,10 @@ class AzureBlobStorageClient : public BlobStorageClient {
AzureBlobStorageClient();
bool createContainerIfNotExists(const PutAzureBlobStorageParameters& params) override;
Azure::Storage::Blobs::Models::UploadBlockBlobResult uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer) override;
- std::string getUrl(const PutAzureBlobStorageParameters& params) override;
+ std::string getUrl(const AzureBlobStorageParameters& params) override;
bool deleteBlob(const DeleteAzureBlobStorageParameters& params) override;
std::unique_ptr<io::InputStream> fetchBlob(const FetchAzureBlobStorageParameters& params) override;
+ std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const ListAzureBlobStorageParameters& params) override;
private:
void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string &container_name);
diff --git a/extensions/azure/storage/BlobStorageClient.h b/extensions/azure/storage/BlobStorageClient.h
index 72872c57a..efa2f8dfc 100644
--- a/extensions/azure/storage/BlobStorageClient.h
+++ b/extensions/azure/storage/BlobStorageClient.h
@@ -42,27 +42,35 @@ SMART_ENUM(OptionalDeletion,
struct AzureBlobStorageParameters {
AzureStorageCredentials credentials;
std::string container_name;
+};
+
+struct AzureBlobStorageBlobOperationParameters : public AzureBlobStorageParameters {
std::string blob_name;
};
-using PutAzureBlobStorageParameters = AzureBlobStorageParameters;
+using PutAzureBlobStorageParameters = AzureBlobStorageBlobOperationParameters;
-struct DeleteAzureBlobStorageParameters : public AzureBlobStorageParameters {
+struct DeleteAzureBlobStorageParameters : public AzureBlobStorageBlobOperationParameters {
OptionalDeletion optional_deletion;
};
-struct FetchAzureBlobStorageParameters : public AzureBlobStorageParameters {
+struct FetchAzureBlobStorageParameters : public AzureBlobStorageBlobOperationParameters {
std::optional<uint64_t> range_start;
std::optional<uint64_t> range_length;
};
+struct ListAzureBlobStorageParameters : public AzureBlobStorageParameters {
+ std::string prefix;
+};
+
class BlobStorageClient {
public:
virtual bool createContainerIfNotExists(const PutAzureBlobStorageParameters& params) = 0;
virtual Azure::Storage::Blobs::Models::UploadBlockBlobResult uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer) = 0;
- virtual std::string getUrl(const PutAzureBlobStorageParameters& params) = 0;
+ virtual std::string getUrl(const AzureBlobStorageParameters& params) = 0;
virtual bool deleteBlob(const DeleteAzureBlobStorageParameters& params) = 0;
virtual std::unique_ptr<io::InputStream> fetchBlob(const FetchAzureBlobStorageParameters& params) = 0;
+ virtual std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const ListAzureBlobStorageParameters& params) = 0;
virtual ~BlobStorageClient() = default;
};
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index 38a0a07f7..17d43c0bb 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -34,11 +34,6 @@
namespace org::apache::nifi::minifi::azure::storage {
-SMART_ENUM(EntityTracking,
- (NONE, "none"),
- (TIMESTAMPS, "timestamps")
-)
-
struct AzureDataLakeStorageParameters {
AzureStorageCredentials credentials;
std::string file_system_name;
diff --git a/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp
new file mode 100644
index 000000000..9fec6d41e
--- /dev/null
+++ b/libminifi/test/azure-tests/ListAzureBlobStorageTests.cpp
@@ -0,0 +1,287 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "../Catch.h"
+#include "MockBlobStorage.h"
+#include "utils/IntegrationTestUtils.h"
+#include "processors/LogAttribute.h"
+#include "processors/ListAzureBlobStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+const std::string CONTAINER_NAME = "test-container";
+const std::string STORAGE_ACCOUNT_NAME = "test-account";
+const std::string STORAGE_ACCOUNT_KEY = "test-key";
+const std::string SAS_TOKEN = "test-sas-token";
+const std::string ENDPOINT_SUFFIX = "test.suffix.com";
+const std::string CONNECTION_STRING = "test-connectionstring";
+const std::string PREFIX = "test_prefix";
+
+class ListAzureBlobStorageTestsFixture {
+ public:
+ ListAzureBlobStorageTestsFixture() {
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<minifi::azure::processors::ListAzureBlobStorage>();
+
+ // Build MiNiFi processing graph
+ plan_ = test_controller_.createPlan();
+ auto mock_blob_storage = std::make_unique<MockBlobStorage>();
+ mock_blob_storage_ptr_ = mock_blob_storage.get();
+ list_azure_blob_storage_ = std::make_shared<minifi::azure::processors::ListAzureBlobStorage>("ListAzureBlobStorage", std::move(mock_blob_storage));
+
+ plan_->addProcessor(list_azure_blob_storage_, "ListAzureBlobStorage", { {"success", "d"} });
+ auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
+ plan_->setProperty(logattribute, minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+ azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ }
+
+ void setDefaultCredentials() {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ }
+
+ virtual ~ListAzureBlobStorageTestsFixture() {
+ LogTestController::getInstance().reset();
+ }
+
+ protected:
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> plan_;
+ MockBlobStorage* mock_blob_storage_ptr_;
+ std::shared_ptr<core::Processor> list_azure_blob_storage_;
+ std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
+};
+
+namespace {
+
+using namespace std::chrono_literals;
+
+TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Test credentials settings", "[azureStorageCredentials]") {
+ plan_->setProperty(list_azure_blob_storage_, "Container Name", CONTAINER_NAME);
+
+ SECTION("No credentials are set") {
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ }
+
+ SECTION("No account key or SAS is set") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ }
+
+ SECTION("Credentials set in Azure Storage Credentials Service") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
+ }
+
+ SECTION("Overriding credentials set in Azure Storage Credentials Service with connection string") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(azure_storage_cred_service, "Connection String", CONNECTION_STRING);
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ }
+
+ SECTION("Account name and key set in properties") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
+ }
+
+ SECTION("Account name and SAS token set in properties") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "SAS Token", SAS_TOKEN);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
+ }
+
+ SECTION("Account name and SAS token with question mark set in properties") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "SAS Token", "?" + SAS_TOKEN);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN);
+ }
+
+ SECTION("Endpoint suffix overriden") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Common Storage Account Endpoint Suffix", ENDPOINT_SUFFIX);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY + ";EndpointSuffix=" + ENDPOINT_SUFFIX);
+ }
+
+ SECTION("Use connection string") {
+ plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ }
+
+ SECTION("Overriding credentials with connection string") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+ }
+
+ SECTION("Account name and managed identity are used in properties") {
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Use Managed Identity Credentials", "true");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString().empty());
+ CHECK(passed_params.credentials.getStorageAccountName() == STORAGE_ACCOUNT_NAME);
+ CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+ CHECK(passed_params.container_name == CONTAINER_NAME);
+ }
+
+ SECTION("Account name and managed identity are used from Azure Storage Credentials Service") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(azure_storage_cred_service, "Use Managed Identity Credentials", "true");
+ plan_->setProperty(azure_storage_cred_service, "Common Storage Account Endpoint Suffix", "core.chinacloudapi.cn");
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString().empty());
+ CHECK(passed_params.credentials.getStorageAccountName() == STORAGE_ACCOUNT_NAME);
+ CHECK(passed_params.credentials.getEndpointSuffix() == "core.chinacloudapi.cn");
+ CHECK(passed_params.container_name == CONTAINER_NAME);
+ }
+
+ SECTION("Azure Storage Credentials Service overrides properties") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING);
+ test_controller_.runSession(plan_, true);
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY);
+ }
+
+ SECTION("Azure Storage Credentials Service is set with invalid parameters") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService");
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ }
+
+ SECTION("Azure Storage Credentials Service name is invalid") {
+ auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "invalid_name");
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME);
+ plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY);
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+ }
+}
+
+TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "List all files every time", "[ListAzureBlobStorage]") {
+ setDefaultCredentials();
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName.getName(), CONTAINER_NAME);
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix.getName(), PREFIX);
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy.getName(),
+ toString(minifi::azure::processors::ListAzureBlobStorage::EntityTracking::NONE));
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto run_assertions = [this]() {
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ CHECK(passed_params.container_name == CONTAINER_NAME);
+ CHECK(passed_params.prefix == PREFIX);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.container value:" + CONTAINER_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobname value:testdir/item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobname value:testdir/item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_blob_storage_ptr_->PRIMARY_URI));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.timestamp value:" + mock_blob_storage_ptr_->ITEM1_LAST_MODIFIED));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.timestamp value:" + mock_blob_storage_ptr_->ITEM2_LAST_MODIFIED));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobtype value:PageBlob"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobtype value:BlockBlob"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:mime.type value:application/zip"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:mime.type value:text/html"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:en-US"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:de-DE"));
+ };
+ run_assertions();
+ plan_->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ test_controller_.runSession(plan_, true);
+ run_assertions();
+}
+
+TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Do not list same files the second time when timestamps are tracked", "[ListAzureBlobStorage]") {
+ setDefaultCredentials();
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName.getName(), CONTAINER_NAME);
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix.getName(), PREFIX);
+ plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy.getName(),
+ toString(minifi::azure::processors::ListAzureBlobStorage::EntityTracking::TIMESTAMPS));
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params = mock_blob_storage_ptr_->getPassedListParams();
+ CHECK(passed_params.container_name == CONTAINER_NAME);
+ CHECK(passed_params.prefix == PREFIX);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.container value:" + CONTAINER_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobname value:testdir/item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobname value:testdir/item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_blob_storage_ptr_->PRIMARY_URI));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.timestamp value:" + mock_blob_storage_ptr_->ITEM1_LAST_MODIFIED));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.timestamp value:" + mock_blob_storage_ptr_->ITEM2_LAST_MODIFIED));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobtype value:PageBlob"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.blobtype value:BlockBlob"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:mime.type value:application/zip"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:mime.type value:text/html"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:en-US"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:lang value:de-DE"));
+ plan_->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ test_controller_.runSession(plan_, true);
+ REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms));
+}
+
+} // namespace
diff --git a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
index 47f27148b..d91771ccd 100644
--- a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
@@ -94,7 +94,8 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Connection String is emp
}
TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "List all files every time", "[listAzureDataLakeStorage]") {
- plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(), toString(minifi::azure::storage::EntityTracking::NONE));
+ plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(),
+ toString(minifi::azure::processors::ListAzureDataLakeStorage::EntityTracking::NONE));
plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(), "false");
test_controller_.runSession(plan_, true);
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
@@ -123,7 +124,8 @@ TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "List all files every tim
}
TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list same files the second time when timestamps are tracked", "[listAzureDataLakeStorage]") {
- plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(), toString(minifi::azure::storage::EntityTracking::TIMESTAMPS));
+ plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(),
+ toString(minifi::azure::processors::ListAzureDataLakeStorage::EntityTracking::TIMESTAMPS));
plan_->setProperty(list_azure_data_lake_storage_, minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(), "false");
test_controller_.runSession(plan_, true);
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
diff --git a/libminifi/test/azure-tests/MockBlobStorage.h b/libminifi/test/azure-tests/MockBlobStorage.h
index f811d33ad..996fef1c8 100644
--- a/libminifi/test/azure-tests/MockBlobStorage.h
+++ b/libminifi/test/azure-tests/MockBlobStorage.h
@@ -32,6 +32,8 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
const std::string PRIMARY_URI = "http://test-uri/file";
const std::string TEST_TIMESTAMP = "Sun, 21 Oct 2018 12:16:24 GMT";
const std::string FETCHED_DATA = "test azure data for stream";
+ const std::string ITEM1_LAST_MODIFIED = "1631292120000";
+ const std::string ITEM2_LAST_MODIFIED = "1634127120000";
bool createContainerIfNotExists(const minifi::azure::storage::PutAzureBlobStorageParameters& params) override {
put_params_ = params;
@@ -53,8 +55,7 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
return result;
}
- std::string getUrl(const minifi::azure::storage::PutAzureBlobStorageParameters& params) override {
- put_params_ = params;
+ std::string getUrl(const minifi::azure::storage::AzureBlobStorageParameters& /*params*/) override {
return RETURNED_PRIMARY_URI;
}
@@ -89,6 +90,33 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
return std::make_unique<org::apache::nifi::minifi::io::BufferStream>(gsl::make_span(buffer_).as_span<const std::byte>());
}
+ std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const minifi::azure::storage::ListAzureBlobStorageParameters& params) override {
+ list_params_ = params;
+ std::vector<Azure::Storage::Blobs::Models::BlobItem> result;
+
+ Azure::Storage::Blobs::Models::BlobItem item1;
+ item1.Name = "testdir/item1.log";
+ item1.Details.LastModified = Azure::DateTime(2021, 9, 10, 16, 42, 0);
+ item1.Details.ETag = Azure::ETag("etag1");
+ item1.Details.HttpHeaders.ContentType = "application/zip";
+ item1.Details.HttpHeaders.ContentLanguage = "en-US";
+ item1.BlobSize = 128;
+ item1.BlobType = Azure::Storage::Blobs::Models::BlobType::BlockBlob;
+
+ Azure::Storage::Blobs::Models::BlobItem item2;
+ item2.Name = "testdir/item2.log";
+ item2.Details.LastModified = Azure::DateTime(2021, 10, 13, 12, 12, 0);
+ item2.Details.ETag = Azure::ETag("etag2");
+ item2.Details.HttpHeaders.ContentType = "text/html";
+ item2.Details.HttpHeaders.ContentLanguage = "de-DE";
+ item2.BlobSize = 256;
+ item2.BlobType = Azure::Storage::Blobs::Models::BlobType::PageBlob;
+
+ result.push_back(item1);
+ result.push_back(item2);
+ return result;
+ }
+
minifi::azure::storage::PutAzureBlobStorageParameters getPassedPutParams() const {
return put_params_;
}
@@ -101,6 +129,10 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
return fetch_params_;
}
+ minifi::azure::storage::ListAzureBlobStorageParameters getPassedListParams() const {
+ return list_params_;
+ }
+
bool getContainerCreated() const {
return container_created_;
}
@@ -126,6 +158,7 @@ class MockBlobStorage : public minifi::azure::storage::BlobStorageClient {
minifi::azure::storage::PutAzureBlobStorageParameters put_params_;
minifi::azure::storage::DeleteAzureBlobStorageParameters delete_params_;
minifi::azure::storage::FetchAzureBlobStorageParameters fetch_params_;
+ minifi::azure::storage::ListAzureBlobStorageParameters list_params_;
bool container_created_ = false;
bool upload_fails_ = false;
bool delete_fails_ = false;