You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/01/17 11:22:51 UTC

[nifi-minifi-cpp] 01/04: MINIFICPP-1630 Create FetchAzureDataLakeStorage processor

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a9a0ade72d2c278d457f2b952fb67d7d4eaa981a
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu Aug 19 11:35:13 2021 +0200

    MINIFICPP-1630 Create FetchAzureDataLakeStorage processor
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1221
---
 PROCESSORS.md                                      |  28 +++++
 README.md                                          |   2 +-
 extensions/aws/processors/FetchS3Object.cpp        |   2 +-
 extensions/aws/processors/FetchS3Object.h          |  11 +-
 .../azure/processors/FetchAzureDataLakeStorage.cpp | 126 +++++++++++++++++++
 .../azure/processors/FetchAzureDataLakeStorage.h   | 100 ++++++++++++++++
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  13 +-
 extensions/azure/storage/AzureDataLakeStorage.h    |   2 +
 .../azure/storage/AzureDataLakeStorageClient.cpp   |  39 +++++-
 .../azure/storage/AzureDataLakeStorageClient.h     |  29 ++++-
 extensions/azure/storage/DataLakeStorageClient.h   |   9 ++
 .../azure-tests/AzureDataLakeStorageTestsFixture.h |  33 +++--
 .../azure-tests/FetchAzureDataLakeStorageTests.cpp | 133 +++++++++++++++++++++
 .../test/azure-tests/MockDataLakeStorageClient.h   |  37 ++++++
 14 files changed, 537 insertions(+), 27 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 5b393a0..6a2dc8e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -21,6 +21,7 @@
 - [ExecuteSQL](#executesql)
 - [ExecuteScript](#executescript)
 - [ExtractText](#extracttext)
+- [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
 - [FetchOPCProcessor](#fetchopcprocessor)
 - [FetchS3Object](#fetchs3object)
 - [FetchSFTP](#fetchsftp)
@@ -504,6 +505,33 @@ In the list below, the names of required properties appear in bold. Any other pr
 |success|success operational on the flow record|
 
 
+## FetchAzureDataLakeStorage
+
+### Description
+
+Fetch the provided file from Azure Data Lake Storage Gen 2
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
+|Range Start|||The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.<br/>**Supports Expression Language: true**|
+|Range Length|||The number of bytes to download from the object, starting from the Range Start. An empty value or a value that extends beyond the end of the object will read to the end of the object.<br/>**Supports Expression Language: true**|
+|Number of Retries|0||The number of automatic retries to perform if the download fails.<br/>**Supports Expression Language: true**|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|In case of fetch failure flowfiles are transferred to this relationship|
+|success|Files that have been successfully fetched from Azure storage are transferred to this relationship|
+
+
 ## FetchOPCProcessor
 
 ### Description
diff --git a/README.md b/README.md
index 934478b..7dac13e 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 | ------------- |:-------------| :-----|
 | Archive Extensions    | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)      |   -DBUILD_LIBARCHIVE=ON |
 | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON  |
-| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON  |
+| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage) | -DENABLE_AZURE=ON  |
 | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp)  | -DDISABLE_CIVET=ON |
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index c7de905..d0099b4 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -109,7 +109,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     return;
   }
 
-  WriteCallback callback(flow_file->getSize(), *get_object_params, s3_wrapper_);
+  WriteCallback callback(*get_object_params, s3_wrapper_);
   session->write(flow_file, &callback);
 
   if (callback.result_) {
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 7e0d79c..23f2dca 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -65,14 +65,12 @@ class FetchS3Object : public S3Processor {
 
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(uint64_t flow_size, const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
-      : flow_size_(flow_size)
-      , get_object_params_(get_object_params)
-      , s3_wrapper_(s3_wrapper) {
+    WriteCallback(const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
+      : get_object_params_(get_object_params),
+        s3_wrapper_(s3_wrapper) {
     }
 
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      std::vector<uint8_t> buffer;
       result_ = s3_wrapper_.getObject(get_object_params_, *stream);
       if (!result_) {
         return 0;
@@ -81,11 +79,8 @@ class FetchS3Object : public S3Processor {
       return result_->write_size;
     }
 
-    uint64_t flow_size_;
-
     const minifi::aws::s3::GetObjectRequestParameters& get_object_params_;
     aws::s3::S3Wrapper& s3_wrapper_;
-    uint64_t write_size_ = 0;
     std::optional<minifi::aws::s3::GetObjectResult> result_;
   };
 
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..4885190
--- /dev/null
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -0,0 +1,126 @@
+/**
+ * @file FetchAzureDataLakeStorage.cpp
+ * FetchAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchAzureDataLakeStorage.h"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property FetchAzureDataLakeStorage::RangeStart(
+    core::PropertyBuilder::createProperty("Range Start")
+      ->withDescription("The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchAzureDataLakeStorage::RangeLength(
+    core::PropertyBuilder::createProperty("Range Length")
+      ->withDescription("The number of bytes to download from the object, starting from the Range Start. "
+                        "An empty value or a value that extends beyond the end of the object will read to the end of the object.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchAzureDataLakeStorage::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of Retries")
+      ->withDescription("The number of automatic retries to perform if the download fails.")
+      ->withDefaultValue<uint64_t>(0)
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Relationship FetchAzureDataLakeStorage::Success("success", "Files that have been successfully fetched from Azure storage are transferred to this relationship");
+const core::Relationship FetchAzureDataLakeStorage::Failure("failure", "In case of fetch failure flowfiles are transferred to this relationship");
+
+void FetchAzureDataLakeStorage::initialize() {
+  // Add new supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    FileName,
+    RangeStart,
+    RangeLength,
+    NumberOfRetries
+  });
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
+}
+
+std::optional<storage::FetchAzureDataLakeStorageParameters> FetchAzureDataLakeStorage::buildFetchParameters(
+    core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  storage::FetchAzureDataLakeStorageParameters params;
+  if (!setCommonParameters(params, context, flow_file)) {
+    return std::nullopt;
+  }
+
+  std::string value;
+  if (context.getProperty(RangeStart, value, flow_file)) {
+    params.range_start = std::stoull(value);
+    logger_->log_debug("Range Start property set to %llu", *params.range_start);
+  }
+
+  if (context.getProperty(RangeLength, value, flow_file)) {
+    params.range_length = std::stoull(value);
+    logger_->log_debug("Range Length property set to %llu", *params.range_length);
+  }
+
+  if (context.getProperty(NumberOfRetries, value, flow_file)) {
+    params.number_of_retries = std::stoull(value);
+    logger_->log_debug("Number Of Retries property set to %llu", *params.number_of_retries);
+  }
+
+  return params;
+}
+
+void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+  logger_->log_debug("FetchAzureDataLakeStorage onTrigger");
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  const auto params = buildFetchParameters(*context, flow_file);
+  if (!params) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto fetched_flow_file = session->create(flow_file);
+  WriteCallback callback(azure_data_lake_storage_, *params, logger_);
+  session->write(fetched_flow_file, &callback);
+
+  if (callback.getResult() == std::nullopt) {
+    logger_->log_error("Failed to fetch file '%s' from Azure Data Lake storage", params->filename);
+    session->transfer(flow_file, Failure);
+    session->remove(fetched_flow_file);
+  } else {
+    logger_->log_debug("Successfully fetched file '%s' from filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name);
+    session->transfer(fetched_flow_file, Success);
+    session->remove(flow_file);
+  }
+}
+
+REGISTER_RESOURCE(FetchAzureDataLakeStorage, "Fetch the provided file from Azure Data Lake Storage Gen 2");
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
new file mode 100644
index 0000000..4768e49
--- /dev/null
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -0,0 +1,100 @@
+/**
+ * @file FetchAzureDataLakeStorage.h
+ * FetchAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class FetchAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+  // Supported Properties
+  EXTENSIONAPI static const core::Property RangeStart;
+  EXTENSIONAPI static const core::Property RangeLength;
+  EXTENSIONAPI static const core::Property NumberOfRetries;
+
+  // Supported Relationships
+  static const core::Relationship Failure;
+  static const core::Relationship Success;
+
+  explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
+  }
+
+  ~FetchAzureDataLakeStorage() override = default;
+
+  void initialize() override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  friend class ::AzureDataLakeStorageTestsFixture<FetchAzureDataLakeStorage>;
+
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::FetchAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger)
+      : azure_data_lake_storage_(azure_data_lake_storage),
+        params_(params),
+        logger_(std::move(logger)) {
+    }
+
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      result_size_ = azure_data_lake_storage_.fetchFile(params_, *stream);
+      if (!result_size_) {
+        return 0;
+      }
+
+      return gsl::narrow<int64_t>(*result_size_);
+    }
+
+    auto getResult() const {
+      return result_size_;
+    }
+
+   private:
+    storage::AzureDataLakeStorage& azure_data_lake_storage_;
+    const storage::FetchAzureDataLakeStorageParameters& params_;
+    std::optional<uint64_t> result_size_ = std::nullopt;
+    std::shared_ptr<core::logging::Logger> logger_;
+  };
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+  explicit FetchAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+  }
+
+  std::optional<storage::FetchAzureDataLakeStorageParameters> buildFetchParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 2be5292..34e2d3b 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -21,6 +21,7 @@
 #include "AzureDataLakeStorage.h"
 
 #include "AzureDataLakeStorageClient.h"
+#include "io/StreamPipe.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
@@ -52,7 +53,7 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL
   }
 }
 
-bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) {
+bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters& params) {
   try {
     return data_lake_storage_client_->deleteFile(params);
   } catch (const std::exception& ex) {
@@ -61,4 +62,14 @@ bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageP
   }
 }
 
+std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) {
+  try {
+    auto result = data_lake_storage_client_->fetchFile(params);
+    return internal::pipe(result.get(), &stream);
+  } catch (const std::exception& ex) {
+    logger_->log_error("An exception occurred while fetching '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
+    return std::nullopt;
+  }
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h
index 8dd9b8a..d1291a2 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -28,6 +28,7 @@
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "DataLakeStorageClient.h"
+#include "azure/core/io/body_stream.hpp"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
@@ -48,6 +49,7 @@ class AzureDataLakeStorage {
 
   storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer);
   bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
+  std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream);
 
  private:
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index 7a54a69..e542432 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -18,7 +18,11 @@
  * limitations under the License.
  */
 
+#include <utility>
+
 #include "AzureDataLakeStorageClient.h"
+#include "azure/core/http/http.hpp"
+#include "azure/storage/files/datalake/datalake_options.hpp"
 
 #include "azure/identity.hpp"
 
@@ -30,30 +34,35 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() {
   utils::AzureSdkLogger::initialize();
 }
 
-void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name) {
-  if (client_ && credentials_ == credentials && file_system_name_ == file_system_name) {
+void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
+  if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) {
     logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
     return;
   }
 
+  Azure::Storage::Files::DataLake::DataLakeClientOptions options;
+  if (number_of_retries) {
+    options.Retry.MaxRetries = *number_of_retries;
+  }
+
   if (credentials.getUseManagedIdentityCredentials()) {
     auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
-      "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
-
+        "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options);
     client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
     logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
   } else {
     client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
-    Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name));
+        Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options));
     logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
   }
 
   file_system_name_ = file_system_name;
   credentials_ = credentials;
+  number_of_retries_ = number_of_retries;
 }
 
 Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.file_system_name);
+  resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
 
   auto directory_client = client_->GetDirectoryClient(params.directory_name);
   if (!params.directory_name.empty()) {
@@ -80,4 +89,22 @@ bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStoragePara
   return result.Value.Deleted;
 }
 
+std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const FetchAzureDataLakeStorageParameters& params) {
+  auto file_client = getFileClient(params);
+  Azure::Storage::Files::DataLake::DownloadFileOptions options;
+  if (params.range_start || params.range_length) {
+    Azure::Core::Http::HttpRange range;
+    if (params.range_start) {
+      range.Offset = *params.range_start;
+    }
+
+    if (params.range_length) {
+      range.Length = *params.range_length;
+    }
+    options.Range = range;
+  }
+  auto result = file_client.Download(options);
+  return std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value));
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index e7d933d..cd8d791 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -21,6 +21,7 @@
 
 #include <string>
 #include <memory>
+#include <utility>
 
 #include <azure/storage/files/datalake.hpp>
 
@@ -56,12 +57,38 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
    */
   bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override;
 
+  /**
+   * Fetches a file from the Azure Data Lake Storage
+   * @param params Parameters required for connecting and file access on Azure
+   * @return Download result of Azure Data Lake storage client
+   */
+  std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) override;
+
  private:
-  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name);
+  class AzureDataLakeStorageInputStream : public io::InputStream {
+   public:
+    explicit AzureDataLakeStorageInputStream(Azure::Storage::Files::DataLake::Models::DownloadFileResult&& result)
+      : result_(std::move(result)) {
+    }
+
+    size_t size() const override {
+      return result_.Body->Length();
+    }
+
+    size_t read(uint8_t *value, size_t len) override {
+      return result_.Body->Read(value, len);
+    }
+
+   private:
+    Azure::Storage::Files::DataLake::Models::DownloadFileResult result_;
+  };
+
+  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
   Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params);
 
   AzureStorageCredentials credentials_;
   std::string file_system_name_;
+  std::optional<uint64_t> number_of_retries_;
   std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
 };
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index 4e97d72..eb57d62 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -21,10 +21,12 @@
 
 #include <string>
 #include <optional>
+#include <memory>
 
 #include "AzureStorageCredentials.h"
 
 #include "utils/gsl.h"
+#include "io/InputStream.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
@@ -33,6 +35,7 @@ struct AzureDataLakeStorageParameters {
   std::string file_system_name;
   std::string directory_name;
   std::string filename;
+  std::optional<uint64_t> number_of_retries;
 };
 
 struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
@@ -41,11 +44,17 @@ struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters
 
 using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
 
+struct FetchAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
+  std::optional<uint64_t> range_start;
+  std::optional<uint64_t> range_length;
+};
+
 class DataLakeStorageClient {
  public:
   virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
   virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
   virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0;
+  virtual std::unique_ptr<io::InputStream> fetchFile(const FetchAzureDataLakeStorageParameters& params) = 0;
   virtual ~DataLakeStorageClient() = default;
 };
 
diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
index 6878e71..1ef56b3 100644
--- a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
+++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
@@ -71,19 +71,32 @@ class AzureDataLakeStorageTestsFixture {
     update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} },  true);
     plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
     auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
-    logattribute->setAutoTerminatedRelationships({{"success", "d"}});
 
-    putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
-    plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_);
-    putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
-    output_dir_ = test_controller_.createTempDirectory();
-    plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
+    success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+    plan_->addConnection(logattribute, {"success", "d"}, success_putfile_);
+    success_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+    success_output_dir_ = test_controller_.createTempDirectory();
+    plan_->setProperty(success_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
+
+    failure_putfile_ = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
+    plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, failure_putfile_);
+    failure_putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+    failure_output_dir_ = test_controller_.createTempDirectory();
+    plan_->setProperty(failure_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
 
     azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
     setDefaultProperties();
   }
 
   std::vector<std::string> getFailedFlowFileContents() {
+    return getFileContents(failure_output_dir_);
+  }
+
+  std::vector<std::string> getSuccessfulFlowFileContents() {
+    return getFileContents(success_output_dir_);
+  }
+
+  std::vector<std::string> getFileContents(const std::string& dir) {
     std::vector<std::string> file_contents;
 
     auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
@@ -92,7 +105,7 @@ class AzureDataLakeStorageTestsFixture {
       return true;
     };
 
-    utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
+    utils::file::FileUtils::list_dir(dir, lambda, plan_->getLogger(), false);
     return file_contents;
   }
 
@@ -116,7 +129,9 @@ class AzureDataLakeStorageTestsFixture {
   std::shared_ptr<core::Processor> azure_data_lake_storage_;
   std::shared_ptr<core::Processor> get_file_;
   std::shared_ptr<core::Processor> update_attribute_;
-  std::shared_ptr<core::Processor> putfile_;
+  std::shared_ptr<core::Processor> success_putfile_;
+  std::shared_ptr<core::Processor> failure_putfile_;
   std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
-  std::string output_dir_;
+  std::string failure_output_dir_;
+  std::string success_output_dir_;
 };
diff --git a/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..a055962
--- /dev/null
+++ b/libminifi/test/azure-tests/FetchAzureDataLakeStorageTests.cpp
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageTestsFixture.h"
+#include "processors/FetchAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace {
+
+using namespace std::chrono_literals;
+
+using FetchAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::FetchAzureDataLakeStorage>;
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+  CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+  CHECK(passed_params.credentials.buildConnectionString().empty());
+  CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
+  CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(update_attribute_, "test.filesystemname", "", true);
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
+  auto failed_flowfiles = getFailedFlowFileContents();
+  REQUIRE(failed_flowfiles.size() == 1);
+  REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file succeeds", "[azureDataLakeStorageFetch]") {
+  test_controller_.runSession(plan_, true);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK(passed_params.range_start == std::nullopt);
+  CHECK(passed_params.range_length == std::nullopt);
+  auto success_contents = getSuccessfulFlowFileContents();
+  REQUIRE(success_contents.size() == 1);
+  REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch a range of the file succeeds", "[azureDataLakeStorageFetch]") {
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeStart.getName(), "5");
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeLength.getName(), "10");
+  test_controller_.runSession(plan_, true);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK(*passed_params.range_start == 5);
+  CHECK(*passed_params.range_length == 10);
+  auto success_contents = getSuccessfulFlowFileContents();
+  REQUIRE(success_contents.size() == 1);
+  REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA.substr(5, 10));
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Number of Retries is set", "[azureDataLakeStorageFetch]") {
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries.getName(), "1");
+  test_controller_.runSession(plan_, true);
+  REQUIRE(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries == 1);
+}
+
+TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file fails", "[azureDataLakeStorageFetch]") {
+  mock_data_lake_storage_client_ptr_->setFetchFailure(true);
+  test_controller_.runSession(plan_, true);
+  REQUIRE(getSuccessfulFlowFileContents().size() == 0);
+  auto failed_contents = getFailedFlowFileContents();
+  REQUIRE(failed_contents.size() == 1);
+  REQUIRE(failed_contents[0] == TEST_DATA);
+}
+
+}  // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
index eeac4dc..8969e1e 100644
--- a/libminifi/test/azure-tests/MockDataLakeStorageClient.h
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -20,12 +20,17 @@
 
 #include <string>
 #include <stdexcept>
+#include <memory>
+#include <utility>
+#include <vector>
 
 #include "storage/DataLakeStorageClient.h"
+#include "io/BufferStream.h"
 
 class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient {
  public:
   const std::string PRIMARY_URI = "http://test-uri/file";
+  const std::string FETCHED_DATA = "test azure data for stream";
 
   bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
     if (file_creation_error_) {
@@ -55,6 +60,27 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
     return delete_result_;
   }
 
+  std::unique_ptr<org::apache::nifi::minifi::io::InputStream> fetchFile(const org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters& params) override {
+    if (fetch_fails_) {
+      throw std::runtime_error("error");
+    }
+
+    fetch_params_ = params;
+    buffer_.clear();
+    uint64_t range_start = 0;
+    uint64_t size = FETCHED_DATA.size();
+    if (params.range_start) {
+      range_start = *params.range_start;
+    }
+
+    if (params.range_length) {
+      size = *params.range_length;
+    }
+
+    buffer_.assign(FETCHED_DATA.begin() + range_start, FETCHED_DATA.begin() + range_start + size);
+    return std::make_unique<org::apache::nifi::minifi::io::BufferStream>(buffer_.data(), buffer_.size());
+  }
+
   void setFileCreation(bool create_file) {
     create_file_ = create_file;
   }
@@ -75,6 +101,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
     delete_result_ = delete_result;
   }
 
+  void setFetchFailure(bool fetch_fails) {
+    fetch_fails_ = fetch_fails;
+  }
+
   org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const {
     return put_params_;
   }
@@ -83,6 +113,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
     return delete_params_;
   }
 
+  org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters getPassedFetchParams() const {
+    return fetch_params_;
+  }
+
  private:
   const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
   bool create_file_ = true;
@@ -90,7 +124,10 @@ class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::stora
   bool upload_fails_ = false;
   bool delete_fails_ = false;
   bool delete_result_ = true;
+  bool fetch_fails_ = false;
   std::string input_data_;
+  std::vector<uint8_t> buffer_;
   org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_;
   org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_;
+  org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters fetch_params_;
 };