You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/12/01 13:27:04 UTC

[nifi-minifi-cpp] branch main updated (112e95d -> 0553df4)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 112e95d  MINIFICPP-1676 Check build identifier in extensions
     new 767435d  MINIFICPP-1677 Add SASL options to Kafka processors
     new 6074634  MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor
     new 0553df4  MINIFICPP-1692 TLSSocket: Break infinite loop when no more data can be read

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 PROCESSORS.md                                      |  44 +++-
 README.md                                          |   2 +-
 cmake/BuildTests.cmake                             |  13 ++
 docker/test/integration/features/kafka.feature     | 123 +++++++++++
 .../minifi/core/KafkaBrokerContainer.py            |   8 +-
 .../integration/resources/kafka_broker/Dockerfile  |   2 +-
 .../kafka_broker/conf/server-ssl.properties        | 155 --------------
 .../resources/kafka_broker/conf/server.properties  |  50 ++++-
 .../AzureDataLakeStorageProcessorBase.cpp          |  81 +++++++
 .../processors/AzureDataLakeStorageProcessorBase.h |  64 ++++++
 .../azure/processors/AzureStorageProcessorBase.cpp |   6 +-
 .../azure/processors/AzureStorageProcessorBase.h   |   2 +-
 .../processors/DeleteAzureDataLakeStorage.cpp      |  85 ++++++++
 .../azure/processors/DeleteAzureDataLakeStorage.h  |  67 ++++++
 .../azure/processors/PutAzureBlobStorage.cpp       |   2 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  55 +----
 .../azure/processors/PutAzureDataLakeStorage.h     |  28 +--
 extensions/azure/storage/AzureDataLakeStorage.cpp  |   9 +
 extensions/azure/storage/AzureDataLakeStorage.h    |   1 +
 .../azure/storage/AzureDataLakeStorageClient.cpp   |   8 +-
 .../azure/storage/AzureDataLakeStorageClient.h     |   9 +-
 extensions/azure/storage/DataLakeStorageClient.h   |   8 +-
 extensions/librdkafka/ConsumeKafka.cpp             |  76 ++-----
 extensions/librdkafka/ConsumeKafka.h               |  15 +-
 extensions/librdkafka/KafkaProcessorBase.cpp       | 135 ++++++++++++
 extensions/librdkafka/KafkaProcessorBase.h         |  69 ++++++
 extensions/librdkafka/PublishKafka.cpp             | 237 +++++----------------
 extensions/librdkafka/PublishKafka.h               |  25 ++-
 .../TLSClientSocketSupportedProtocolsTest.cpp      | 118 ++--------
 .../tests/unit/GenerateFlowFileTests.cpp           |   9 +-
 libminifi/include/utils/ProcessorConfigUtils.h     |  18 +-
 libminifi/src/io/tls/TLSSocket.cpp                 |   4 +-
 libminifi/src/utils/ProcessorConfigUtils.cpp       |  18 +-
 libminifi/test/SimpleSSLTestServer.h               | 130 +++++++++++
 .../azure-tests/AzureDataLakeStorageTestsFixture.h | 122 +++++++++++
 .../DeleteAzureDataLakeStorageTests.cpp            | 128 +++++++++++
 .../test/azure-tests/MockDataLakeStorageClient.h   |  96 +++++++++
 .../test/azure-tests/PutAzureBlobStorageTests.cpp  |   3 +-
 .../azure-tests/PutAzureDataLakeStorageTests.cpp   | 168 ++-------------
 libminifi/test/unit/tls/TLSStreamTests.cpp         |  82 +++++++
 40 files changed, 1495 insertions(+), 780 deletions(-)
 delete mode 100644 docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
 create mode 100644 extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
 create mode 100644 extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
 create mode 100644 extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
 create mode 100644 extensions/azure/processors/DeleteAzureDataLakeStorage.h
 create mode 100644 extensions/librdkafka/KafkaProcessorBase.cpp
 create mode 100644 extensions/librdkafka/KafkaProcessorBase.h
 create mode 100644 libminifi/test/SimpleSSLTestServer.h
 create mode 100644 libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
 create mode 100644 libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
 create mode 100644 libminifi/test/azure-tests/MockDataLakeStorageClient.h
 create mode 100644 libminifi/test/unit/tls/TLSStreamTests.cpp

[nifi-minifi-cpp] 03/03: MINIFICPP-1692 TLSSocket: Break infinite loop when no more data can be read

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0553df446aefb395b34c1e4e2b5ced31f07a4e04
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Dec 1 14:23:32 2021 +0100

    MINIFICPP-1692 TLSSocket: Break infinite loop when no more data can be read
    
    Closes #1218
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 cmake/BuildTests.cmake                             |  13 +++
 .../TLSClientSocketSupportedProtocolsTest.cpp      | 118 ++++---------------
 libminifi/src/io/tls/TLSSocket.cpp                 |   4 +-
 libminifi/test/SimpleSSLTestServer.h               | 130 +++++++++++++++++++++
 libminifi/test/unit/tls/TLSStreamTests.cpp         |  82 +++++++++++++
 5 files changed, 247 insertions(+), 100 deletions(-)

diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index ecb5c70..6d5402e 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -101,6 +101,7 @@ target_include_directories(${CATCH_MAIN_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOUR
 SET(TEST_RESOURCES ${TEST_DIR}/resources)
 
 GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/")
+GETSOURCEFILES(TLS_UNIT_TESTS "${TEST_DIR}/unit/tls/")
 GETSOURCEFILES(NANOFI_UNIT_TESTS "${NANOFI_TEST_DIR}")
 GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
 
@@ -115,6 +116,18 @@ FOREACH(testfile ${UNIT_TESTS})
 ENDFOREACH()
 message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...")
 
+if (NOT OPENSSL_OFF)
+  SET(UNIT_TEST_COUNT 0)
+  FOREACH(testfile ${TLS_UNIT_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${TEST_DIR}/unit/tls/${testfile}")
+    createTests("${testfilename}")
+    MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/" WORKING_DIRECTORY ${TEST_DIR})
+  ENDFOREACH()
+  message("-- Finished building ${UNIT_TEST_COUNT} TLS unit test file(s)...")
+endif()
+
 if(NOT WIN32 AND ENABLE_NANOFI)
   SET(UNIT_TEST_COUNT 0)
   FOREACH(testfile ${NANOFI_UNIT_TESTS})
diff --git a/extensions/standard-processors/tests/integration/TLSClientSocketSupportedProtocolsTest.cpp b/extensions/standard-processors/tests/integration/TLSClientSocketSupportedProtocolsTest.cpp
index 928bc56..6cc643c 100644
--- a/extensions/standard-processors/tests/integration/TLSClientSocketSupportedProtocolsTest.cpp
+++ b/extensions/standard-processors/tests/integration/TLSClientSocketSupportedProtocolsTest.cpp
@@ -19,6 +19,7 @@
 #include <sys/stat.h>
 #include <chrono>
 #include <thread>
+#include <filesystem>
 #undef NDEBUG
 #include <cassert>
 #include <utility>
@@ -26,114 +27,34 @@
 #include <string>
 #include "properties/Configure.h"
 #include "io/tls/TLSSocket.h"
+#include "SimpleSSLTestServer.h"
 
 namespace minifi = org::apache::nifi::minifi;
 
-#ifdef WIN32
-#pragma comment(lib, "Ws2_32.lib")
-using SocketDescriptor = SOCKET;
-#else
-using SocketDescriptor = int;
-static constexpr SocketDescriptor INVALID_SOCKET = -1;
-#endif /* WIN32 */
-
-
-class SimpleSSLTestServer  {
- public:
-  SimpleSSLTestServer(const SSL_METHOD* method, const std::string& port, const std::string& path)
-      : port_(port), had_connection_(false) {
-    ctx_ = SSL_CTX_new(method);
-    configureContext(path);
-    socket_descriptor_ = createSocket(std::stoi(port_));
-  }
-
-  ~SimpleSSLTestServer() {
-      SSL_shutdown(ssl_);
-      SSL_free(ssl_);
-      SSL_CTX_free(ctx_);
-  }
-
-  void waitForConnection() {
-    server_read_thread_ = std::thread([this]() -> void {
-        SocketDescriptor client = accept(socket_descriptor_, nullptr, nullptr);
-        if (client != INVALID_SOCKET) {
-            ssl_ = SSL_new(ctx_);
-            SSL_set_fd(ssl_, client);
-            had_connection_ = (SSL_accept(ssl_) == 1);
-        }
-    });
-  }
-
-  void shutdownServer() {
-#ifdef WIN32
-    shutdown(socket_descriptor_, SD_BOTH);
-    closesocket(socket_descriptor_);
-#else
-    shutdown(socket_descriptor_, SHUT_RDWR);
-    close(socket_descriptor_);
-#endif
-    server_read_thread_.join();
-  }
-
-  bool hadConnection() const {
-    return had_connection_;
-  }
-
- private:
-  SSL_CTX *ctx_ = nullptr;
-  SSL* ssl_ = nullptr;
-  std::string port_;
-  SocketDescriptor socket_descriptor_;
-  bool had_connection_;
-  std::thread server_read_thread_;
-
-  void configureContext(const std::string& path) {
-    SSL_CTX_set_ecdh_auto(ctx_, 1);
-    /* Set the key and cert */
-    assert(SSL_CTX_use_certificate_file(ctx_, (path + "cn.crt.pem").c_str(), SSL_FILETYPE_PEM) == 1);
-    assert(SSL_CTX_use_PrivateKey_file(ctx_, (path + "cn.ckey.pem").c_str(), SSL_FILETYPE_PEM) == 1);
-  }
-
-  static SocketDescriptor createSocket(int port) {
-    struct sockaddr_in addr;
-
-    addr.sin_family = AF_INET;
-    addr.sin_port = htons(port);
-    addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
-    SocketDescriptor socket_descriptor = socket(AF_INET, SOCK_STREAM, 0);
-    assert(socket_descriptor >= 0);
-    assert(bind(socket_descriptor, (struct sockaddr*)&addr, sizeof(addr)) >= 0);
-    assert(listen(socket_descriptor, 1) >= 0);
-
-    return socket_descriptor;
-  }
-};
-
 class SimpleSSLTestServerTLSv1  : public SimpleSSLTestServer {
  public:
-  SimpleSSLTestServerTLSv1(const std::string& port, const std::string& path)
-      : SimpleSSLTestServer(TLSv1_server_method(), port, path) {
+  SimpleSSLTestServerTLSv1(int port, const std::filesystem::path& key_dir)
+      : SimpleSSLTestServer(TLSv1_server_method(), port, key_dir) {
   }
 };
 
 class SimpleSSLTestServerTLSv1_1  : public SimpleSSLTestServer {
  public:
-  SimpleSSLTestServerTLSv1_1(const std::string& port, const std::string& path)
-      : SimpleSSLTestServer(TLSv1_1_server_method(), port, path) {
+  SimpleSSLTestServerTLSv1_1(int port, const std::filesystem::path& key_dir)
+      : SimpleSSLTestServer(TLSv1_1_server_method(), port, key_dir) {
   }
 };
 
 class SimpleSSLTestServerTLSv1_2  : public SimpleSSLTestServer {
  public:
-  SimpleSSLTestServerTLSv1_2(const std::string& port, const std::string& path)
-      : SimpleSSLTestServer(TLSv1_2_server_method(), port, path) {
+  SimpleSSLTestServerTLSv1_2(int port, const std::filesystem::path& key_dir)
+      : SimpleSSLTestServer(TLSv1_2_server_method(), port, key_dir) {
   }
 };
 
 class TLSClientSocketSupportedProtocolsTest {
  public:
-  explicit TLSClientSocketSupportedProtocolsTest(const std::string& key_dir)
+  explicit TLSClientSocketSupportedProtocolsTest(const std::filesystem::path& key_dir)
       : key_dir_(key_dir), configuration_(std::make_shared<minifi::Configure>()) {
   }
 
@@ -147,14 +68,13 @@ class TLSClientSocketSupportedProtocolsTest {
  protected:
   void configureSecurity() {
     host_ = minifi::io::Socket::getMyHostName();
-    port_ = "38777";
     if (!key_dir_.empty()) {
       configuration_->set(minifi::Configure::nifi_remote_input_secure, "true");
-      configuration_->set(minifi::Configure::nifi_security_client_certificate, key_dir_ + "cn.crt.pem");
-      configuration_->set(minifi::Configure::nifi_security_client_private_key, key_dir_ + "cn.ckey.pem");
-      configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, key_dir_ + "cn.pass");
-      configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, key_dir_ + "nifi-cert.pem");
-      configuration_->set(minifi::Configure::nifi_default_directory, key_dir_);
+      configuration_->set(minifi::Configure::nifi_security_client_certificate, (key_dir_ / "cn.crt.pem").string());
+      configuration_->set(minifi::Configure::nifi_security_client_private_key, (key_dir_ / "cn.ckey.pem").string());
+      configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, (key_dir_ / "cn.pass").string());
+      configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (key_dir_ / "nifi-cert.pem").string());
+      configuration_->set(minifi::Configure::nifi_default_directory, key_dir_.string());
     }
   }
 
@@ -166,11 +86,14 @@ class TLSClientSocketSupportedProtocolsTest {
 
   template <class TLSTestSever>
   void verifyTLSProtocolCompatibility(const bool should_be_compatible) {
-    TLSTestSever server(port_, key_dir_);
+    // bind to random port
+    TLSTestSever server(0, key_dir_);
     server.waitForConnection();
 
+    int port = server.getPort();
+
     const auto socket_context = std::make_shared<minifi::io::TLSContext>(configuration_);
-    client_socket_ = std::make_unique<minifi::io::TLSSocket>(socket_context, host_, std::stoi(port_), 0);
+    client_socket_ = std::make_unique<minifi::io::TLSSocket>(socket_context, host_, port, 0);
     const bool client_initialized_successfully = (client_socket_->initialize() == 0);
     assert(client_initialized_successfully == should_be_compatible);
     server.shutdownServer();
@@ -180,8 +103,7 @@ class TLSClientSocketSupportedProtocolsTest {
  protected:
     std::unique_ptr<minifi::io::TLSSocket> client_socket_;
     std::string host_;
-    std::string port_;
-    std::string key_dir_;
+    std::filesystem::path key_dir_;
     std::shared_ptr<minifi::Configure> configuration_;
 };
 
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 5d76e8c..af8772a 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -434,9 +434,9 @@ size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
       const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
       status = SSL_read(fd_ssl, buf, ssl_read_size);
       sslStatus = SSL_get_error(fd_ssl, status);
-    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+    } while (status <= 0 && sslStatus == SSL_ERROR_WANT_READ);
 
-    if (status < 0)
+    if (status <= 0)
       break;
 
     buflen -= gsl::narrow<size_t>(status);
diff --git a/libminifi/test/SimpleSSLTestServer.h b/libminifi/test/SimpleSSLTestServer.h
new file mode 100644
index 0000000..b6cecf5
--- /dev/null
+++ b/libminifi/test/SimpleSSLTestServer.h
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <filesystem>
+#include <string>
+#include "io/tls/TLSSocket.h"
+
+#ifdef WIN32
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#pragma comment(lib, "Ws2_32.lib")
+using SocketDescriptor = SOCKET;
+#else
+using SocketDescriptor = int;
+static constexpr SocketDescriptor INVALID_SOCKET = -1;
+#endif /* WIN32 */
+
+namespace minifi = org::apache::nifi::minifi;
+
+class SimpleSSLTestServer  {
+  struct SocketInitializer {
+    SocketInitializer() {
+#ifdef WIN32
+      static WSADATA s_wsaData;
+      const int iWinSockInitResult = WSAStartup(MAKEWORD(2, 2), &s_wsaData);
+      if (0 != iWinSockInitResult) {
+        throw std::runtime_error("Cannot initialize socket");
+      }
+#endif
+    }
+  };
+
+ public:
+  SimpleSSLTestServer(const SSL_METHOD* method, int port, const std::filesystem::path& key_dir)
+      : port_(port), had_connection_(false) {
+    static SocketInitializer socket_initializer{};
+    minifi::io::OpenSSLInitializer::getInstance();
+    ctx_ = SSL_CTX_new(method);
+    configureContext(key_dir);
+    socket_descriptor_ = createSocket(port_);
+  }
+
+  ~SimpleSSLTestServer() {
+    SSL_shutdown(ssl_);
+    SSL_free(ssl_);
+    SSL_CTX_free(ctx_);
+  }
+
+  void waitForConnection() {
+    server_read_thread_ = std::thread([this]() -> void {
+      SocketDescriptor client = accept(socket_descriptor_, nullptr, nullptr);
+      if (client != INVALID_SOCKET) {
+        ssl_ = SSL_new(ctx_);
+        SSL_set_fd(ssl_, client);
+        had_connection_ = (SSL_accept(ssl_) == 1);
+      }
+    });
+  }
+
+  void shutdownServer() {
+#ifdef WIN32
+    shutdown(socket_descriptor_, SD_BOTH);
+    closesocket(socket_descriptor_);
+#else
+    shutdown(socket_descriptor_, SHUT_RDWR);
+    close(socket_descriptor_);
+#endif
+    server_read_thread_.join();
+  }
+
+  bool hadConnection() const {
+    return had_connection_;
+  }
+
+  int getPort() const {
+    struct sockaddr_in addr;
+    socklen_t addr_len = sizeof(addr);
+    assert(getsockname(socket_descriptor_, (struct sockaddr*)&addr, &addr_len) == 0);
+    return ntohs(addr.sin_port);
+  }
+
+ private:
+  SSL_CTX *ctx_ = nullptr;
+  SSL* ssl_ = nullptr;
+  int port_;
+  SocketDescriptor socket_descriptor_;
+  bool had_connection_;
+  std::thread server_read_thread_;
+
+  void configureContext(const std::filesystem::path& key_dir) {
+    SSL_CTX_set_ecdh_auto(ctx_, 1);
+    /* Set the key and cert */
+    assert(SSL_CTX_use_certificate_file(ctx_, (key_dir / "cn.crt.pem").string().c_str(), SSL_FILETYPE_PEM) == 1);
+    assert(SSL_CTX_use_PrivateKey_file(ctx_, (key_dir / "cn.ckey.pem").string().c_str(), SSL_FILETYPE_PEM) == 1);
+  }
+
+  static SocketDescriptor createSocket(int port) {
+    struct sockaddr_in addr;
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(port);
+    addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+    SocketDescriptor socket_descriptor = socket(AF_INET, SOCK_STREAM, 0);
+    assert(socket_descriptor >= 0);
+    assert(bind(socket_descriptor, (struct sockaddr*)&addr, sizeof(addr)) >= 0);
+    assert(listen(socket_descriptor, 1) >= 0);
+
+    return socket_descriptor;
+  }
+};
diff --git a/libminifi/test/unit/tls/TLSStreamTests.cpp b/libminifi/test/unit/tls/TLSStreamTests.cpp
new file mode 100644
index 0000000..9fc5939
--- /dev/null
+++ b/libminifi/test/unit/tls/TLSStreamTests.cpp
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef LOAD_EXTENSIONS
+#undef NDEBUG
+
+#include <cassert>
+
+#include "io/tls/TLSServerSocket.h"
+#include "io/tls/TLSSocket.h"
+#include "../../TestBase.h"
+#include "../../SimpleSSLTestServer.h"
+#include "../utils/IntegrationTestUtils.h"
+
+using namespace std::chrono_literals;
+
+static std::shared_ptr<minifi::io::TLSContext> createContext(const std::filesystem::path& key_dir) {
+  auto configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_remote_input_secure, "true");
+  configuration->set(minifi::Configure::nifi_security_client_certificate, (key_dir / "cn.crt.pem").string());
+  configuration->set(minifi::Configure::nifi_security_client_private_key, (key_dir / "cn.ckey.pem").string());
+  configuration->set(minifi::Configure::nifi_security_client_pass_phrase, (key_dir / "cn.pass").string());
+  configuration->set(minifi::Configure::nifi_security_client_ca_certificate, (key_dir / "nifi-cert.pem").string());
+  configuration->set(minifi::Configure::nifi_default_directory, key_dir.string());
+
+  return std::make_shared<minifi::io::TLSContext>(configuration);
+}
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    throw std::logic_error("Specify the key directory");
+  }
+  std::filesystem::path key_dir(argv[1]);
+
+  LogTestController::getInstance().setTrace<minifi::io::Socket>();
+  LogTestController::getInstance().setTrace<minifi::io::TLSSocket>();
+  LogTestController::getInstance().setTrace<minifi::io::TLSServerSocket>();
+  LogTestController::getInstance().setTrace<minifi::io::TLSContext>();
+
+  auto server = std::make_unique<SimpleSSLTestServer>(TLSv1_2_server_method(), 0, key_dir);
+  int port = server->getPort();
+  server->waitForConnection();
+
+  std::string host = minifi::io::Socket::getMyHostName();
+
+  auto client_ctx = createContext(key_dir);
+  assert(client_ctx->initialize(false) == 0);
+
+  minifi::io::TLSSocket client_socket(client_ctx, host, port);
+  assert(client_socket.initialize() == 0);
+
+  std::atomic_bool read_complete{false};
+
+  std::thread read_thread{[&] {
+    std::vector<uint8_t> buffer;
+    auto read_count = client_socket.read(buffer, 10);
+    assert(read_count == 0);
+    read_complete = true;
+  }};
+
+  server->shutdownServer();
+  server.reset();
+
+  assert(utils::verifyEventHappenedInPollTime(1s, [&] {return read_complete.load();}));
+
+  read_thread.join();
+}

[nifi-minifi-cpp] 01/03: MINIFICPP-1677 Add SASL options to Kafka processors

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 767435de8be46bc5f5a1ae3ea180569a3bcaf8a2
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Dec 1 14:05:08 2021 +0100

    MINIFICPP-1677 Add SASL options to Kafka processors
    
    - Add new security options: SASL_PLAIN and SASL_SSL
    - Add SASL mechanism property with GSSAPI and PLAIN support
    - Add Username and Password properties for PLAIN mechanism
    - Extract common Kafka authentication code to add the same authentication options to ConsumeKafka processor as PublishKafka
    - Add Kafka broker configuration and tests for SASL/PLAIN and SASL/SSL with username and password authentication
    
    Closes #1212
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  15 +-
 docker/test/integration/features/kafka.feature     | 123 +++++++++++
 .../minifi/core/KafkaBrokerContainer.py            |   8 +-
 .../integration/resources/kafka_broker/Dockerfile  |   2 +-
 .../kafka_broker/conf/server-ssl.properties        | 155 --------------
 .../resources/kafka_broker/conf/server.properties  |  50 ++++-
 extensions/librdkafka/ConsumeKafka.cpp             |  76 ++-----
 extensions/librdkafka/ConsumeKafka.h               |  15 +-
 extensions/librdkafka/KafkaProcessorBase.cpp       | 135 ++++++++++++
 extensions/librdkafka/KafkaProcessorBase.h         |  69 ++++++
 extensions/librdkafka/PublishKafka.cpp             | 237 +++++----------------
 extensions/librdkafka/PublishKafka.h               |  25 ++-
 libminifi/include/utils/ProcessorConfigUtils.h     |  18 +-
 libminifi/src/utils/ProcessorConfigUtils.cpp       |  18 +-
 14 files changed, 501 insertions(+), 445 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8c85ec9..2a1bcbb 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -253,17 +253,23 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Headers To Add As Attributes|||A comma separated list to match against all message headers. Any message header whose name matches an item from the list will be added to the FlowFile as an Attribute. If not specified, no Header values will be added as FlowFile attributes. The behaviour on when multiple headers of the same name are present is set using the DuplicateHeaderHandling attribute.|
 |**Honor Transactions**|true||Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an "isolation level" of read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some la [...]
 |**Kafka Brokers**|localhost:9092||A comma-separated list of known Kafka Brokers in the format <host>:<port>.<br/>**Supports Expression Language: true**|
+|Kerberos Keytab Path|||The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.|
+|Kerberos Principal|||Keberos Principal|
+|Kerberos Service Name|||Kerberos Service Name|
 |**Key Attribute Encoding**|UTF-8|Hex<br>UTF-8<br>|FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.|
 |Max Poll Records|10000||Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.|
 |**Max Poll Time**|4 seconds||Specifies the maximum amount of time the consumer can use for polling data from the brokers. Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.|
 |Message Demarcator|||Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. <br/>**Supports Expression Langua [...]
 |Message Header Encoding|UTF-8|Hex<br>UTF-8<br>|Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding to use for deserializing the headers.|
 |**Offset Reset**|latest|earliest<br>latest<br>none<br>|Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
-|SSL Context Service|||SSL Context Service Name|
-|**Security Protocol**|plaintext|plaintext<br>ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
+|Password|||The password for the given username when the SASL Mechanism is sasl_plaintext|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
+|**Security Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
 |Session Timeout|60 seconds||Client group session and failure detection timeout. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.|
+|SSL Context Service|||SSL Context Service Name|
 |**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression. Using regular expressions does not automatically discover Kafka topics created after the processor started.|
 |**Topic Names**|||The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.<br/>**Supports Expression Language: true**|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 ### Properties
 
 | Name | Description |
@@ -1187,16 +1193,19 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Kafka Key|||The key to use for the message. If not specified, the UUID of the flow file is used as the message key.<br/>**Supports Expression Language: true**|
 |Message Key Field|||DEPRECATED, does not work -- use Kafka Key instead|
 |Message Timeout|30 sec||The total time sending a message could take|
+|Password|||The password for the given username when the SASL Mechanism is sasl_plaintext|
 |Queue Buffering Max Time|||Delay to wait for messages in the producer queue to accumulate before constructing message batches|
 |Queue Max Buffer Size|||Maximum total message size sum allowed on the producer queue|
 |Queue Max Message|||Maximum number of messages allowed on the producer queue|
 |Request Timeout|10 sec||The ack timeout of the producer request|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
 |SSL Context Service|||SSL Context Service Name|
 |Security CA|||DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key|
 |Security Cert|||DEPRECATED in favor of SSL Context Service. Path to client's public key (PEM) used for authentication|
 |Security Pass Phrase|||DEPRECATED in favor of SSL Context Service. Private key passphrase|
 |Security Private Key|||DEPRECATED in favor of SSL Context Service. Path to client's private key (PEM) used for authentication|
-|Security Protocol|||Protocol used to communicate with brokers|
+|**Security Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 |Target Batch Payload Size|512 KB||The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).|
 |**Topic Name**|||The Kafka Topic of interest<br/>**Supports Expression Language: true**|
 ### Relationships
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index ab4c34e..9825b55 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -87,6 +87,89 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
     And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
 
+  Scenario: A MiNiFi instance transfers data to a kafka broker through SASL Plain security protocol
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                         |
+      | PublishKafka   | Topic Name             | test                                   |
+      | PublishKafka   | Request Timeout        | 10 sec                                 |
+      | PublishKafka   | Message Timeout        | 12 sec                                 |
+      | PublishKafka   | Known Brokers          | kafka-broker:9094                      |
+      | PublishKafka   | Client Name            | LMN                                    |
+      | PublishKafka   | Security Protocol      | sasl_plaintext                         |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                  |
+      | PublishKafka   | Username               | alice                                  |
+      | PublishKafka   | Password               | alice-secret                           |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with security properties
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                             |
+      | PublishKafka   | Client Name            | LMN                                        |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095                          |
+      | PublishKafka   | Topic Name             | test                                       |
+      | PublishKafka   | Batch Size             | 10                                         |
+      | PublishKafka   | Compress Codec         | none                                       |
+      | PublishKafka   | Delivery Guarantee     | 1                                          |
+      | PublishKafka   | Request Timeout        | 10 sec                                     |
+      | PublishKafka   | Message Timeout        | 12 sec                                     |
+      | PublishKafka   | Security CA            | /tmp/resources/certs/ca-cert               |
+      | PublishKafka   | Security Cert          | /tmp/resources/certs/client_LMN_client.pem |
+      | PublishKafka   | Security Pass Phrase   | abcdefgh                                   |
+      | PublishKafka   | Security Private Key   | /tmp/resources/certs/client_LMN_client.key |
+      | PublishKafka   | Security Protocol      | sasl_ssl                                   |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                      |
+      | PublishKafka   | Username               | alice                                      |
+      | PublishKafka   | Password               | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with SSL Context
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                             |
+      | PublishKafka   | Client Name            | LMN                                        |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095                          |
+      | PublishKafka   | Topic Name             | test                                       |
+      | PublishKafka   | Batch Size             | 10                                         |
+      | PublishKafka   | Compress Codec         | none                                       |
+      | PublishKafka   | Delivery Guarantee     | 1                                          |
+      | PublishKafka   | Request Timeout        | 10 sec                                     |
+      | PublishKafka   | Message Timeout        | 12 sec                                     |
+      | PublishKafka   | Security Protocol      | sasl_ssl                                   |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                      |
+      | PublishKafka   | Username               | alice                                      |
+      | PublishKafka   | Password               | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And an ssl context service set up for PublishKafka
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
   Scenario: PublishKafka sends can use SSL connect with SSL Context Service
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
@@ -327,3 +410,43 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     And a message with content "Lewis Carroll" is published to the "ConsumeKafkaTest" topic using an ssl connection
 
     Then two flowfiles with the contents "Alice's Adventures in Wonderland" and "Lewis Carroll" are placed in the monitored directory in less than 60 seconds
+
+  Scenario: ConsumeKafka receives data via SASL SSL
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And these processor properties are set:
+      | processor name | property name        | property value                             |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9095                          |
+      | ConsumeKafka   | Security Protocol    | sasl_ssl                                   |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                                      |
+      | ConsumeKafka   | Username             | alice                                      |
+      | ConsumeKafka   | Password             | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And an ssl context service set up for ConsumeKafka
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the publisher flow
+
+    When all instances start up
+    And a message with content "Alice's Adventures in Wonderland" is published to the "ConsumeKafkaTest" topic using an ssl connection
+    And a message with content "Lewis Carroll" is published to the "ConsumeKafkaTest" topic using an ssl connection
+
+    Then two flowfiles with the contents "Alice's Adventures in Wonderland" and "Lewis Carroll" are placed in the monitored directory in less than 60 seconds
+
+  Scenario: MiNiFi consumes data from a kafka topic via SASL PLAIN connection
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+    And these processor properties are set:
+      | processor name | property name        | property value                             |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9094                          |
+      | ConsumeKafka   | Security Protocol    | sasl_plaintext                             |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                                      |
+      | ConsumeKafka   | Username             | alice                                      |
+      | ConsumeKafka   | Password             | alice-secret                               |
+
+    And a kafka broker is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "some test message" is published to the "ConsumeKafkaTest" topic
+
+    Then at least one flowfile with the content "some test message" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/KafkaBrokerContainer.py b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
index df215ef..64346c3 100644
--- a/docker/test/integration/minifi/core/KafkaBrokerContainer.py
+++ b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
@@ -19,15 +19,15 @@ class KafkaBrokerContainer(Container):
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093, '9094/tcp': 9094, '29094/tcp': 29094, '9094/tcp': 9094, '29095/tcp': 29095},
             environment=[
                 "KAFKA_BROKER_ID=1",
                 "ALLOW_PLAINTEXT_LISTENER=yes",
                 "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
-                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
-                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SASL_PLAINTEXT://kafka-broker:9094,SASL_SSL://kafka-broker:9095,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092,SASL_PLAINTEXT_HOST://0.0.0.0:29094,SASL_SSL_HOST://0.0.0.0:29095",
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL",
                 "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
-                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093,SASL_PLAINTEXT://kafka-broker:9094,SASL_PLAINTEXT_HOST://localhost:29094,SASL_SSL://kafka-broker:9095,SASL_SSL_HOST://localhost:29095",
                 "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
                 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
                 "SSL_CLIENT_AUTH=none"])
diff --git a/docker/test/integration/resources/kafka_broker/Dockerfile b/docker/test/integration/resources/kafka_broker/Dockerfile
index 31472c9..9c332a1 100644
--- a/docker/test/integration/resources/kafka_broker/Dockerfile
+++ b/docker/test/integration/resources/kafka_broker/Dockerfile
@@ -1,3 +1,3 @@
 FROM wurstmeister/kafka:2.12-2.5.0
-ADD conf/server-ssl.properties $KAFKA_HOME/config/server.properties
+ADD conf/server.properties $KAFKA_HOME/config/server.properties
 ADD conf/ /usr/local/etc/kafka/
diff --git a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties b/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
deleted file mode 100644
index c7158fa..0000000
--- a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
+++ /dev/null
@@ -1,155 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# see kafka.server.KafkaConfig for additional details and defaults
-
-############################# Server Basics #############################
-
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=0
-
-############################# Socket Server Settings #############################
-
-# The address the socket server listens on. It will get the value returned from
-# java.net.InetAddress.getCanonicalHostName() if not configured.
-#   FORMAT:
-#     listeners = listener_name://host_name:port
-#   EXAMPLE:
-#     listeners = PLAINTEXT://your.host.name:9092
-#listeners=PLAINTEXT://:9092
-
-# Hostname and port the broker will advertise to producers and consumers. If not set,
-# it uses the value for "listeners" if configured.  Otherwise, it will use the value
-# returned from java.net.InetAddress.getCanonicalHostName().
-#advertised.listeners=PLAINTEXT://your.host.name:9092
-
-# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
-#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
-# The number of threads that the server uses for receiving requests from the network and sending responses to the network
-num.network.threads=3
-
-# The number of threads that the server uses for processing requests, which may include disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-socket.request.max.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# A comma separated list of directories under which to store log files
-log.dirs=/tmp/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Internal Topic Settings  #############################
-# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
-# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
-offsets.topic.replication.factor=1
-transaction.state.log.replication.factor=1
-transaction.state.log.min.isr=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-#log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-#log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion due to age
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
-# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=localhost:2181
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
-
-
-############################# Group Coordinator Settings #############################
-
-# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
-# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
-# The default value for this is 3 seconds.
-# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
-# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
-group.initial.rebalance.delay.ms=0
-
-listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
-advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
-listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
-
-# SSL
-ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
-ssl.keystore.type = JKS
-ssl.keystore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
-# ssl.keystore.location = /usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
-ssl.keystore.password = abcdefgh
-ssl.key.password = abcdefgh
-ssl.truststore.type = JKS
-ssl.truststore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
-# ssl.truststore.location = /usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
-ssl.truststore.password = abcdefgh
-# To require authentication of clients use "require", else "none" or "request"
-ssl.client.auth = required
diff --git a/docker/test/integration/resources/kafka_broker/conf/server.properties b/docker/test/integration/resources/kafka_broker/conf/server.properties
index 20d9095..e752fca 100644
--- a/docker/test/integration/resources/kafka_broker/conf/server.properties
+++ b/docker/test/integration/resources/kafka_broker/conf/server.properties
@@ -22,7 +22,7 @@ broker.id=0
 
 ############################# Socket Server Settings #############################
 
-# The address the socket server listens on. It will get the value returned from 
+# The address the socket server listens on. It will get the value returned from
 # java.net.InetAddress.getCanonicalHostName() if not configured.
 #   FORMAT:
 #     listeners = listener_name://host_name:port
@@ -30,7 +30,7 @@ broker.id=0
 #     listeners = PLAINTEXT://your.host.name:9092
 #listeners=PLAINTEXT://:9092
 
-# Hostname and port the broker will advertise to producers and consumers. If not set, 
+# Hostname and port the broker will advertise to producers and consumers. If not set,
 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092
@@ -57,7 +57,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/usr/local/var/lib/kafka-logs
+log.dirs=/tmp/kafka-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
@@ -133,4 +133,46 @@ zookeeper.connection.timeout.ms=6000
 # The default value for this is 3 seconds.
 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
-group.initial.rebalance.delay.ms=0
\ No newline at end of file
+group.initial.rebalance.delay.ms=0
+
+sasl.enabled.mechanisms=PLAIN
+sasl.mechanism.inter.broker.protocol=PLAIN
+confluent.metrics.reporter.sasl.mechanism=PLAIN
+listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_plaintext_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+# SSL
+ssl.protocol = TLS
+ssl.enabled.protocols=TLSv1.2
+ssl.keystore.type = JKS
+ssl.keystore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
+# ssl.keystore.location = /usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
+ssl.keystore.password = abcdefgh
+ssl.key.password = abcdefgh
+ssl.truststore.type = JKS
+ssl.truststore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
+# ssl.truststore.location = /usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
+ssl.truststore.password = abcdefgh
+# To require authentication of clients use "require", else "none" or "request"
+ssl.client.auth = required
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index e27051e..eef2fb4 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -65,13 +65,6 @@ core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty(
   ->isRequired(true)
   ->build());
 
-core::Property ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security Protocol")
-  ->withDescription("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
-  ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, SECURITY_PROTOCOL_SSL})
-  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
-  ->isRequired(true)
-  ->build());
-
 core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
   ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.")
   ->supportsExpressionLanguage(true)
@@ -168,18 +161,19 @@ core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createPropert
   ->withDefaultValue<core::TimePeriodValue>("60 seconds")
   ->build());
 
-core::Property ConsumeKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
-
 const core::Relationship ConsumeKafka::Success("success", "Incoming Kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
 
 void ConsumeKafka::initialize() {
   setSupportedProperties({
-    KafkaBrokers,
     SecurityProtocol,
+    SSLContextService,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    SASLMechanism,
+    Username,
+    Password,
+    KafkaBrokers,
     TopicNames,
     TopicNameFormat,
     HonorTransactions,
@@ -192,8 +186,7 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout,
-    SSLContextService
+    SessionTimeout
   });
   setSupportedRelationships({
     Success,
@@ -203,40 +196,22 @@ void ConsumeKafka::initialize() {
 void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
   gsl_Expects(context);
   // Required properties
-  kafka_brokers_                = utils::getRequiredPropertyOrThrow(context, KafkaBrokers.getName());
-  security_protocol_            = utils::getRequiredPropertyOrThrow(context, SecurityProtocol.getName());
-  topic_names_                  = utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName());
-  topic_name_format_            = utils::getRequiredPropertyOrThrow(context, TopicNameFormat.getName());
-  honor_transactions_           = utils::parseBooleanPropertyOrThrow(context, HonorTransactions.getName());
-  group_id_                     = utils::getRequiredPropertyOrThrow(context, GroupID.getName());
-  offset_reset_                 = utils::getRequiredPropertyOrThrow(context, OffsetReset.getName());
-  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(context, KeyAttributeEncoding.getName());
-  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(context, MaxPollTime.getName());
-  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, SessionTimeout.getName());
+  kafka_brokers_                = utils::getRequiredPropertyOrThrow(*context, KafkaBrokers.getName());
+  topic_names_                  = utils::listFromRequiredCommaSeparatedProperty(*context, TopicNames.getName());
+  topic_name_format_            = utils::getRequiredPropertyOrThrow(*context, TopicNameFormat.getName());
+  honor_transactions_           = utils::parseBooleanPropertyOrThrow(*context, HonorTransactions.getName());
+  group_id_                     = utils::getRequiredPropertyOrThrow(*context, GroupID.getName());
+  offset_reset_                 = utils::getRequiredPropertyOrThrow(*context, OffsetReset.getName());
+  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(*context, KeyAttributeEncoding.getName());
+  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(*context, MaxPollTime.getName());
+  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(*context, SessionTimeout.getName());
 
   // Optional properties
   context->getProperty(MessageDemarcator.getName(), message_demarcator_);
   context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
 
-  std::string ssl_service_name;
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(ssl_service_name);
-    if (service) {
-      ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-      ssl_data_.ca_loc = ssl_service->getCACertificate();
-      ssl_data_.cert_loc = ssl_service->getCertificateFile();
-      ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
-      ssl_data_.key_pw = ssl_service->getPassphrase();
-    } else {
-      logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", ssl_service_name);
-    }
-  } else if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    logger_->log_warn("Security protocol is set to %s, but no valid SSL Context Service property is set.", SECURITY_PROTOCOL_SSL);
-  }
-
-  headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.getName());
+  headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(*context, HeadersToAddAsAttributes.getName());
   max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(*context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
 
   if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) {
@@ -324,7 +299,7 @@ void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessCont
   }
 }
 
-void ConsumeKafka::configure_new_connection(const core::ProcessContext& context) {
+void ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
   using utils::setKafkaConfigurationField;
 
   conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
@@ -342,18 +317,11 @@ void ConsumeKafka::configure_new_connection(const core::ProcessContext& context)
   // logger_->log_info("Enabling all debug logs for kafka consumer.");
   // setKafkaConfigurationField(*conf_, "debug", "all");
 
+  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+
   setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
   setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
   setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
-
-  if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    setKafkaConfigurationField(*conf_, "security.protocol", "ssl");
-    setKafkaConfigurationField(*conf_, "ssl.ca.location", ssl_data_.ca_loc);
-    setKafkaConfigurationField(*conf_, "ssl.certificate.location", ssl_data_.cert_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.location", ssl_data_.key_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.password", ssl_data_.key_pw);
-  }
-
   setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
   setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
   setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted");
diff --git a/extensions/librdkafka/ConsumeKafka.h b/extensions/librdkafka/ConsumeKafka.h
index 14a23d0..e8610e0 100644
--- a/extensions/librdkafka/ConsumeKafka.h
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -23,7 +23,7 @@
 #include <utility>
 #include <vector>
 
-#include "core/Processor.h"
+#include "KafkaProcessorBase.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rdkafka.h"
 #include "rdkafka_utils.h"
@@ -35,13 +35,12 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class ConsumeKafka : public core::Processor {
+class ConsumeKafka : public KafkaProcessorBase {
  public:
   EXTENSIONAPI static constexpr char const* ProcessorName = "ConsumeKafka";
 
   // Supported Properties
   EXTENSIONAPI static core::Property KafkaBrokers;
-  EXTENSIONAPI static core::Property SecurityProtocol;
   EXTENSIONAPI static core::Property TopicNames;
   EXTENSIONAPI static core::Property TopicNameFormat;
   EXTENSIONAPI static core::Property HonorTransactions;
@@ -55,7 +54,6 @@ class ConsumeKafka : public core::Processor {
   EXTENSIONAPI static core::Property MaxPollRecords;
   EXTENSIONAPI static core::Property MaxPollTime;
   EXTENSIONAPI static core::Property SessionTimeout;
-  EXTENSIONAPI static core::Property SSLContextService;
 
   // Supported Relationships
   EXTENSIONAPI static const core::Relationship Success;
@@ -98,7 +96,7 @@ class ConsumeKafka : public core::Processor {
   static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 60000 };
 
   explicit ConsumeKafka(const std::string& name, const utils::Identifier& uuid = utils::Identifier()) :
-      Processor(name, uuid) {}
+      KafkaProcessorBase(name, uuid, core::logging::LoggerFactory<ConsumeKafka>::getLogger()) {}
 
   ~ConsumeKafka() override = default;
 
@@ -126,7 +124,7 @@ class ConsumeKafka : public core::Processor {
  private:
   void create_topic_partition_list();
   void extend_config_from_dynamic_properties(const core::ProcessContext& context);
-  void configure_new_connection(const core::ProcessContext& context);
+  void configure_new_connection(core::ProcessContext& context);
   std::string extract_message(const rd_kafka_message_t& rkmessage) const;
   std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> poll_kafka_messages();
   utils::KafkaEncoding key_attr_encoding_attr_to_enum() const;
@@ -155,7 +153,6 @@ class ConsumeKafka : public core::Processor {
   }
 
   std::string kafka_brokers_;
-  std::string security_protocol_;
   std::vector<std::string> topic_names_;
   std::string topic_name_format_;
   bool honor_transactions_;
@@ -170,8 +167,6 @@ class ConsumeKafka : public core::Processor {
   std::chrono::milliseconds max_poll_time_milliseconds_;
   std::chrono::milliseconds session_timeout_milliseconds_;
 
-  utils::SSL_data ssl_data_;
-
   std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_;
   std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_;
   std::unique_ptr<rd_kafka_topic_partition_list_t, utils::rd_kafka_topic_partition_list_deleter> kf_topic_partition_list_;
@@ -181,8 +176,6 @@ class ConsumeKafka : public core::Processor {
   std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> pending_messages_;
 
   std::mutex do_not_call_on_trigger_concurrently_;
-
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ConsumeKafka>::getLogger()};
 };
 
 }  // namespace processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.cpp b/extensions/librdkafka/KafkaProcessorBase.cpp
new file mode 100644
index 0000000..d54404c
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.cpp
@@ -0,0 +1,135 @@
+/**
+ * @file KafkaProcessorBase.cpp
+ * KafkaProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "KafkaProcessorBase.h"
+
+#include "rdkafka_utils.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property KafkaProcessorBase::SecurityProtocol(
+        core::PropertyBuilder::createProperty("Security Protocol")
+        ->withDescription("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
+        ->withDefaultValue<std::string>(toString(SecurityProtocolOption::PLAINTEXT))
+        ->withAllowableValues<std::string>(SecurityProtocolOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+        ->withDescription("SSL Context Service Name")
+        ->asType<minifi::controllers::SSLContextService>()
+        ->build());
+const core::Property KafkaProcessorBase::KerberosServiceName(
+    core::PropertyBuilder::createProperty("Kerberos Service Name")
+        ->withDescription("Kerberos Service Name")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosPrincipal(
+    core::PropertyBuilder::createProperty("Kerberos Principal")
+        ->withDescription("Keberos Principal")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosKeytabPath(
+    core::PropertyBuilder::createProperty("Kerberos Keytab Path")
+        ->withDescription("The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.")
+        ->build());
+const core::Property KafkaProcessorBase::SASLMechanism(
+        core::PropertyBuilder::createProperty("SASL Mechanism")
+        ->withDescription("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
+        ->withDefaultValue<std::string>(toString(SASLMechanismOption::GSSAPI))
+        ->withAllowableValues<std::string>(SASLMechanismOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::Username(
+    core::PropertyBuilder::createProperty("Username")
+        ->withDescription("The username when the SASL Mechanism is sasl_plaintext")
+        ->build());
+const core::Property KafkaProcessorBase::Password(
+    core::PropertyBuilder::createProperty("Password")
+        ->withDescription("The password for the given username when the SASL Mechanism is sasl_plaintext")
+        ->build());
+
+std::optional<utils::SSL_data> KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
+  std::string ssl_service_name;
+  if (context.getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(ssl_service_name);
+    if (service) {
+      auto ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      utils::SSL_data ssl_data;
+      ssl_data.ca_loc = ssl_service->getCACertificate();
+      ssl_data.cert_loc = ssl_service->getCertificateFile();
+      ssl_data.key_loc = ssl_service->getPrivateKeyFile();
+      ssl_data.key_pw = ssl_service->getPassphrase();
+      return ssl_data;
+    } else {
+      logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", ssl_service_name);
+      return std::nullopt;
+    }
+  } else if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ == SecurityProtocolOption::SASL_SSL) {
+    logger_->log_warn("Security protocol is set to %s, but no valid SSL Context Service property is set.", security_protocol_.toString());
+  }
+
+  return std::nullopt;
+}
+
+void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config) {
+  security_protocol_ = utils::getRequiredPropertyOrThrow<SecurityProtocolOption>(context, SecurityProtocol.getName());
+  utils::setKafkaConfigurationField(*config, "security.protocol", security_protocol_.toString());
+  logger_->log_debug("Kafka security.protocol [%s]", security_protocol_.toString());
+  if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ == SecurityProtocolOption::SASL_SSL) {
+    auto ssl_data = getSslData(context);
+    if (ssl_data) {
+      if (ssl_data->ca_loc.empty() && ssl_data->cert_loc.empty() && ssl_data->key_loc.empty() && ssl_data->key_pw.empty()) {
+        logger_->log_warn("Security protocol is set to %s, but no valid security parameters are set in the properties or in the SSL Context Service.", security_protocol_.toString());
+      } else {
+        utils::setKafkaConfigurationField(*config, "ssl.ca.location", ssl_data->ca_loc);
+        logger_->log_debug("Kafka ssl.ca.location [%s]", ssl_data->ca_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.certificate.location", ssl_data->cert_loc);
+        logger_->log_debug("Kafka ssl.certificate.location [%s]", ssl_data->cert_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.location", ssl_data->key_loc);
+        logger_->log_debug("Kafka ssl.key.location [%s]", ssl_data->key_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.password", ssl_data->key_pw);
+        logger_->log_debug("Kafka ssl.key.password was set");
+      }
+    }
+  }
+
+  auto sasl_mechanism = utils::getRequiredPropertyOrThrow<SASLMechanismOption>(context, SASLMechanism.getName());
+  utils::setKafkaConfigurationField(*config, "sasl.mechanism", sasl_mechanism.toString());
+  logger_->log_debug("Kafka sasl.mechanism [%s]", sasl_mechanism.toString());
+
+  auto setKafkaConfigIfNotEmpty = [this, &context, config](const std::string& property_name, const std::string& kafka_config_name, bool log_value = true) {
+    std::string value;
+    if (context.getProperty(property_name, value) && !value.empty()) {
+      utils::setKafkaConfigurationField(*config, kafka_config_name, value);
+      if (log_value) {
+        logger_->log_debug("Kafka %s [%s]", kafka_config_name, value);
+      } else {
+        logger_->log_debug("Kafka %s was set", kafka_config_name);
+      }
+    }
+  };
+
+  setKafkaConfigIfNotEmpty(KerberosServiceName.getName(), "sasl.kerberos.service.name");
+  setKafkaConfigIfNotEmpty(KerberosPrincipal.getName(), "sasl.kerberos.principal");
+  setKafkaConfigIfNotEmpty(KerberosKeytabPath.getName(), "sasl.kerberos.keytab");
+  setKafkaConfigIfNotEmpty(Username.getName(), "sasl.username");
+  setKafkaConfigIfNotEmpty(Password.getName(), "sasl.password", false);
+}
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.h b/extensions/librdkafka/KafkaProcessorBase.h
new file mode 100644
index 0000000..6879de2
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.h
@@ -0,0 +1,69 @@
+/**
+ * @file KafkaProcessorBase.h
+ * KafkaProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "core/Processor.h"
+#include "rdkafka_utils.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+// PublishKafka Class
+class KafkaProcessorBase : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property SecurityProtocol;
+  EXTENSIONAPI static const core::Property KerberosServiceName;
+  EXTENSIONAPI static const core::Property KerberosPrincipal;
+  EXTENSIONAPI static const core::Property KerberosKeytabPath;
+  EXTENSIONAPI static const core::Property SASLMechanism;
+  EXTENSIONAPI static const core::Property Username;
+  EXTENSIONAPI static const core::Property Password;
+
+  SMART_ENUM(SecurityProtocolOption,
+    (PLAINTEXT, "plaintext"),
+    (SSL, "ssl"),
+    (SASL_PLAIN, "sasl_plaintext"),
+    (SASL_SSL, "sasl_ssl")
+  )
+
+  SMART_ENUM(SASLMechanismOption,
+    (GSSAPI, "GSSAPI"),
+    (PLAIN, "PLAIN")
+  )
+
+  KafkaProcessorBase(const std::string& name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+      : core::Processor(name, uuid),
+        logger_(logger) {
+  }
+
+ protected:
+  virtual std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const;
+  void setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config);
+
+  SecurityProtocolOption security_protocol_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index cae955f..f021bed 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -41,23 +41,9 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
 const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
-
 const core::Property PublishKafka::Topic(
     core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -68,7 +54,6 @@ const core::Property PublishKafka::DeliveryGuarantee(
                                                                                  "-1 or all (block until message is committed by all in sync replicas) "
                                                                                  "or any concrete number of nodes.")
         ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
-
 const core::Property PublishKafka::MaxMessageSize(
     core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
         ->isRequired(false)->build());
@@ -118,33 +103,14 @@ const core::Property PublishKafka::CompressCodec(
         ->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
         ->withDescription("compression codec to use for compressing message sets")
         ->build());
-
 const core::Property PublishKafka::MaxFlowSegSize(
     core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
         ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
-const core::Property PublishKafka::SecurityProtocol(
-        core::PropertyBuilder::createProperty("Security Protocol")
-        ->withDescription("Protocol used to communicate with brokers")
-        ->withDefaultValue<std::string>(SECURITY_PROTOCOL_PLAINTEXT)
-        ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, SECURITY_PROTOCOL_SSL})
-        ->isRequired(true)
-        ->build());
-
-const core::Property PublishKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
 
 const core::Property PublishKafka::SecurityCA("Security CA", "DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key", "");
 const core::Property PublishKafka::SecurityCert("Security Cert", "DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication", "");
 const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication", "");
 const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "DEPRECATED in favor of SSL Context Service.Private key passphrase", "");
-const core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
-const core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
-const core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
-                                                "The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
-
 const core::Property PublishKafka::KafkaKey(
     core::PropertyBuilder::createProperty("Kafka Key")
         ->withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
@@ -475,41 +441,44 @@ void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage
 
 void PublishKafka::initialize() {
   // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(SeedBrokers);
-  properties.insert(Topic);
-  properties.insert(DeliveryGuarantee);
-  properties.insert(MaxMessageSize);
-  properties.insert(RequestTimeOut);
-  properties.insert(MessageTimeOut);
-  properties.insert(ClientName);
-  properties.insert(AttributeNameRegex);
-  properties.insert(BatchSize);
-  properties.insert(TargetBatchPayloadSize);
-  properties.insert(QueueBufferMaxTime);
-  properties.insert(QueueBufferMaxSize);
-  properties.insert(QueueBufferMaxMessage);
-  properties.insert(CompressCodec);
-  properties.insert(MaxFlowSegSize);
-  properties.insert(SecurityProtocol);
-  properties.insert(SSLContextService);
-  properties.insert(SecurityCA);
-  properties.insert(SecurityCert);
-  properties.insert(SecurityPrivateKey);
-  properties.insert(SecurityPrivateKeyPassWord);
-  properties.insert(KerberosServiceName);
-  properties.insert(KerberosPrincipal);
-  properties.insert(KerberosKeytabPath);
-  properties.insert(KafkaKey);
-  properties.insert(MessageKeyField);
-  properties.insert(DebugContexts);
-  properties.insert(FailEmptyFlowFiles);
-  setSupportedProperties(properties);
+  setSupportedProperties({
+    SeedBrokers,
+    Topic,
+    DeliveryGuarantee,
+    MaxMessageSize,
+    RequestTimeOut,
+    MessageTimeOut,
+    ClientName,
+    AttributeNameRegex,
+    BatchSize,
+    TargetBatchPayloadSize,
+    QueueBufferMaxTime,
+    QueueBufferMaxSize,
+    QueueBufferMaxMessage,
+    CompressCodec,
+    MaxFlowSegSize,
+    SecurityProtocol,
+    SSLContextService,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassWord,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    KafkaKey,
+    MessageKeyField,
+    DebugContexts,
+    FailEmptyFlowFiles,
+    SASLMechanism,
+    Username,
+    Password
+  });
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Failure);
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
 }
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
@@ -622,33 +591,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+
   value = "";
   if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) {
     result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
@@ -709,97 +652,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == SECURITY_PROTOCOL_SSL) {
-      result = rd_kafka_conf_set(conf_.get(), "security.protocol", value.c_str(), errstr.data(), errstr.size());
-      logger_->log_debug("PublishKafka: security.protocol [%s]", value);
-      if (result != RD_KAFKA_CONF_OK) {
-        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-
-      std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-      if (context->getProperty(SSLContextService.getName(), value) && !value.empty()) {
-        std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-        if (service) {
-          ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-        } else {
-          logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", value);
-        }
-      }
-
-      std::string security_ca;
-      if (ssl_service) {
-        security_ca = ssl_service->getCACertificate();
-      } else {
-        context->getProperty(SecurityCA.getName(), security_ca);
-      }
-
-      std::string security_cert;
-      if (ssl_service) {
-        security_cert = ssl_service->getCertificateFile();
-      } else {
-        context->getProperty(SecurityCert.getName(), security_cert);
-      }
-
-      std::string security_private_key;
-      if (ssl_service) {
-        security_private_key = ssl_service->getPrivateKeyFile();
-      } else {
-        context->getProperty(SecurityPrivateKey.getName(), security_private_key);
-      }
-
-      std::string security_private_key_password;
-      if (ssl_service) {
-        security_private_key_password = ssl_service->getPassphrase();
-      } else {
-        context->getProperty(SecurityPrivateKeyPassWord.getName(), security_private_key_password);
-      }
-
-      if (!security_ca.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", security_ca.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", security_ca);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_cert.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.certificate.location", security_cert.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", security_cert);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.location", security_private_key.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.location [%s]", security_private_key);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key_password.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.password", security_private_key_password.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.password [%s]", security_private_key_password);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
 
-      if (security_ca.empty() && security_cert.empty() && security_private_key.empty() && security_private_key_password.empty()) {
-        logger_->log_warn("Security protocol is set to %s, but no valid security parameters are set in the properties or in the SSL Context Service.", SECURITY_PROTOCOL_SSL);
-      }
-    } else if (value == SECURITY_PROTOCOL_PLAINTEXT) {
-      // Do nothing
-    } else {
-      auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value);
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+  setKafkaAuthenticationParameters(*context, gsl::make_not_null(conf_.get()));
 
   // Add all of the dynamic properties as librdkafka configurations
   const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
@@ -919,6 +773,19 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &c
   return true;
 }
 
+std::optional<utils::SSL_data> PublishKafka::getSslData(core::ProcessContext& context) const {
+  if (auto result = KafkaProcessorBase::getSslData(context); result) {
+    return result;
+  }
+
+  utils::SSL_data ssl_data;
+  context.getProperty(SecurityCA.getName(), ssl_data.ca_loc);
+  context.getProperty(SecurityCert.getName(), ssl_data.cert_loc);
+  context.getProperty(SecurityPrivateKey.getName(), ssl_data.key_loc);
+  context.getProperty(SecurityPrivateKeyPassWord.getName(), ssl_data.key_pw);
+  return ssl_data;
+}
+
 void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   // Check whether we have been interrupted
   if (interrupted_) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index c2a8197..3108c54 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -32,9 +32,9 @@
 #include <vector>
 #include <regex>
 
+#include "KafkaProcessorBase.h"
 #include "utils/GeneralUtils.h"
 #include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Property.h"
@@ -51,7 +51,7 @@ namespace minifi {
 namespace processors {
 
 // PublishKafka Class
-class PublishKafka : public core::Processor {
+class PublishKafka : public KafkaProcessorBase {
  public:
   static constexpr char const* ProcessorName = "PublishKafka";
 
@@ -71,15 +71,10 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
   EXTENSIONAPI static const core::Property CompressCodec;
   EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property SSLContextService;
-  EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassWord;
-  EXTENSIONAPI static const core::Property KerberosServiceName;
-  EXTENSIONAPI static const core::Property KerberosPrincipal;
-  EXTENSIONAPI static const core::Property KerberosKeytabPath;
   EXTENSIONAPI static const core::Property KafkaKey;
   EXTENSIONAPI static const core::Property MessageKeyField;
   EXTENSIONAPI static const core::Property DebugContexts;
@@ -89,8 +84,19 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Relationship Failure;
   EXTENSIONAPI static const core::Relationship Success;
 
+  static constexpr const char* COMPRESSION_CODEC_NONE = "none";
+  static constexpr const char* COMPRESSION_CODEC_GZIP = "gzip";
+  static constexpr const char* COMPRESSION_CODEC_SNAPPY = "snappy";
+  static constexpr const char* ROUND_ROBIN_PARTITIONING = "Round Robin";
+  static constexpr const char* RANDOM_PARTITIONING = "Random Robin";
+  static constexpr const char* USER_DEFINED_PARTITIONING = "User-Defined";
+  static constexpr const char* DELIVERY_REPLICATED = "all";
+  static constexpr const char* DELIVERY_ONE_NODE = "1";
+  static constexpr const char* DELIVERY_BEST_EFFORT = "0";
+  static constexpr const char* KAFKA_KEY_ATTRIBUTE = "kafka.key";
+
   explicit PublishKafka(const std::string& name, const utils::Identifier& uuid = {})
-      : core::Processor(name, uuid) {
+      : KafkaProcessorBase(name, uuid, core::logging::LoggerFactory<PublishKafka>::getLogger()) {
   }
 
   ~PublishKafka() override = default;
@@ -113,14 +119,13 @@ class PublishKafka : public core::Processor {
  protected:
   bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
   bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name, const std::shared_ptr<core::FlowFile>& flow_file);
+  std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const override;
 
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PublishKafka>::getLogger()};
-
   KafkaConnectionKey key_;
   std::unique_ptr<KafkaConnection> conn_;
   std::mutex connection_mutex_;
diff --git a/libminifi/include/utils/ProcessorConfigUtils.h b/libminifi/include/utils/ProcessorConfigUtils.h
index acf7fd5..9f28de8 100644
--- a/libminifi/include/utils/ProcessorConfigUtils.h
+++ b/libminifi/include/utils/ProcessorConfigUtils.h
@@ -30,11 +30,19 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, const std::string& property_name);
-std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name);
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name);
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::string& property_name);
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* context, const std::string& property_name);
+template<typename PropertyType = std::string>
+PropertyType getRequiredPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name) {
+  PropertyType value;
+  if (!context.getProperty(property_name, value)) {
+    throw std::runtime_error(property_name + " property missing or invalid");
+  }
+  return value;
+}
+
+std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name);
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name);
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name);
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name);
 std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& context, const std::string& property_name);
 std::string parsePropertyWithAllowableValuesOrThrow(const core::ProcessContext& context, const std::string& property_name, const std::set<std::string>& allowable_values);
 
diff --git a/libminifi/src/utils/ProcessorConfigUtils.cpp b/libminifi/src/utils/ProcessorConfigUtils.cpp
index a423d87..93b4ec8 100644
--- a/libminifi/src/utils/ProcessorConfigUtils.cpp
+++ b/libminifi/src/utils/ProcessorConfigUtils.cpp
@@ -28,25 +28,17 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, const std::string& property_name) {
-  std::string value;
-  if (!context->getProperty(property_name, value)) {
-    throw std::runtime_error(property_name + " property missing or invalid");
-  }
-  return value;
-}
-
-std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name) {
   std::string property_string;
-  context->getProperty(property_name, property_string);
+  context.getProperty(property_name, property_string);
   return utils::StringUtils::splitAndTrim(property_string, ",");
 }
 
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name) {
   return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, property_name), ",");
 }
 
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::string& property_name) {
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name) {
   const std::string value_str = getRequiredPropertyOrThrow(context, property_name);
   const auto maybe_value = utils::StringUtils::toBool(value_str);
   if (!maybe_value) {
@@ -55,7 +47,7 @@ bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::strin
   return maybe_value.value();
 }
 
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* context, const std::string& property_name) {
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name) {
   core::TimeUnit unit;
   uint64_t time_value_ms;
   const std::string value_str = getRequiredPropertyOrThrow(context, property_name);

[nifi-minifi-cpp] 02/03: MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6074634dcdb18f86d599aa9cfaee4f09e05752c8
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Dec 1 14:07:22 2021 +0100

    MINIFICPP-1629 Add DeleteAzureDataLakeStorage processor
    
    Closes #1195
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  29 +++-
 README.md                                          |   2 +-
 .../AzureDataLakeStorageProcessorBase.cpp          |  81 ++++++++++
 .../processors/AzureDataLakeStorageProcessorBase.h |  64 ++++++++
 .../azure/processors/AzureStorageProcessorBase.cpp |   6 +-
 .../azure/processors/AzureStorageProcessorBase.h   |   2 +-
 .../processors/DeleteAzureDataLakeStorage.cpp      |  85 +++++++++++
 .../azure/processors/DeleteAzureDataLakeStorage.h  |  67 ++++++++
 .../azure/processors/PutAzureBlobStorage.cpp       |   2 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  55 ++-----
 .../azure/processors/PutAzureDataLakeStorage.h     |  28 ++--
 extensions/azure/storage/AzureDataLakeStorage.cpp  |   9 ++
 extensions/azure/storage/AzureDataLakeStorage.h    |   1 +
 .../azure/storage/AzureDataLakeStorageClient.cpp   |   8 +-
 .../azure/storage/AzureDataLakeStorageClient.h     |   9 +-
 extensions/azure/storage/DataLakeStorageClient.h   |   8 +-
 .../tests/unit/GenerateFlowFileTests.cpp           |   9 +-
 .../azure-tests/AzureDataLakeStorageTestsFixture.h | 122 +++++++++++++++
 .../DeleteAzureDataLakeStorageTests.cpp            | 128 ++++++++++++++++
 .../test/azure-tests/MockDataLakeStorageClient.h   |  96 ++++++++++++
 .../test/azure-tests/PutAzureBlobStorageTests.cpp  |   3 +-
 .../azure-tests/PutAzureDataLakeStorageTests.cpp   | 168 ++-------------------
 22 files changed, 747 insertions(+), 235 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2a1bcbb..de7eadf 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -14,6 +14,7 @@
 - [ConsumeKafka](#consumekafka)
 - [ConsumeMQTT](#consumemqtt)
 - [DefragmentText](#defragmenttext)
+- [DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
 - [DeleteS3Object](#deletes3object)
 - [ExecuteProcess](#executeprocess)
 - [ExecutePythonProcessor](#executepythonprocessor)
@@ -328,6 +329,30 @@ In the list below, the names of required properties appear in bold. Any other pr
 |failure|Flowfiles that failed the defragmentation process|
 
 
+## DeleteAzureDataLakeStorage
+
+### Description
+
+Deletes the provided file from Azure Data Lake Storage Gen 2
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|failure|If file deletion from Azure Data Lake storage fails the flowfile is transferred to this relationship|
+|success|If file deletion from Azure Data Lake storage succeeds the flowfile is transferred to this relationship|
+
+
 ## DeleteS3Object
 
 ### Description
@@ -1287,10 +1312,10 @@ In the list below, the names of required properties appear in bold. Any other pr
 | Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
 |**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials Service used to retrieve the connection string from.|
+|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.|
+|File Name|||The filename in Azure Storage. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
 |**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to be already existing.<br/>**Supports Expression Language: true**|
 |Directory Name|||Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. If left empty it designates the root directory. The directory will be created if not already existing.<br/>**Supports Expression Language: true**|
-|File Name|||The filename to be uploaded. If left empty the filename attribute will be used by default.<br/>**Supports Expression Language: true**|
-|**Conflict Resolution Strategy**|fail|fail<br/>replace<br/>ignore|Indicates what should happen when a file with the same name already exists in the output directory.|
 
 ### Relationships
 
diff --git a/README.md b/README.md
index 3aa08be..0bd2328 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 | ------------- |:-------------| :-----|
 | Archive Extensions    | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)      |   -DBUILD_LIBARCHIVE=ON |
 | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON  |
-| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage) | -DENABLE_AZURE=ON  |
+| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage) | -DENABLE_AZURE=ON  |
 | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp)  | -DDISABLE_CIVET=ON |
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
new file mode 100644
index 0000000..04d7664
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
@@ -0,0 +1,81 @@
+/**
+ * @file AzureDataLakeStorageProcessorBase.cpp
+ * AzureDataLakeStorageProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property AzureDataLakeStorageProcessorBase::FilesystemName(
+    core::PropertyBuilder::createProperty("Filesystem Name")
+      ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.")
+      ->supportsExpressionLanguage(true)
+      ->isRequired(true)
+      ->build());
+const core::Property AzureDataLakeStorageProcessorBase::DirectoryName(
+    core::PropertyBuilder::createProperty("Directory Name")
+      ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. "
+                        "If left empty it designates the root directory. The directory will be created if not already existing.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+const core::Property AzureDataLakeStorageProcessorBase::FileName(
+    core::PropertyBuilder::createProperty("File Name")
+      ->withDescription("The filename in Azure Storage. If left empty the filename attribute will be used by default.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+void AzureDataLakeStorageProcessorBase::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
+  gsl_Expects(context);
+  std::optional<storage::AzureStorageCredentials> credentials;
+  std::tie(std::ignore, credentials) = getCredentialsFromControllerService(*context);
+  if (!credentials) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid");
+  }
+
+  if (!credentials->isValid()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Credentials set in the Azure Storage credentials service are invalid");
+  }
+
+  credentials_ = *credentials;
+}
+
+bool AzureDataLakeStorageProcessorBase::setCommonParameters(
+    storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  params.credentials = credentials_;
+
+  if (!context.getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) {
+    logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name);
+    return false;
+  }
+
+  context.getProperty(DirectoryName, params.directory_name, flow_file);
+
+  context.getProperty(FileName, params.filename, flow_file);
+  if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) {
+    logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!");
+    return false;
+  }
+
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
new file mode 100644
index 0000000..14f2a92
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
@@ -0,0 +1,64 @@
+/**
+ * @file AzureDataLakeStorageProcessorBase.h
+ * AzureDataLakeStorageProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <memory>
+#include <optional>
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "storage/AzureDataLakeStorage.h"
+#include "AzureStorageProcessorBase.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class AzureDataLakeStorageProcessorBase : public AzureStorageProcessorBase {
+ public:
+  // Supported Properties
+  EXTENSIONAPI static const core::Property FilesystemName;
+  EXTENSIONAPI static const core::Property DirectoryName;
+  EXTENSIONAPI static const core::Property FileName;
+
+  explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger)
+    : AzureStorageProcessorBase(name, uuid, logger) {
+  }
+
+  ~AzureDataLakeStorageProcessorBase() override = default;
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ protected:
+  explicit AzureDataLakeStorageProcessorBase(const std::string& name, const minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> &logger,
+    std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureStorageProcessorBase(name, uuid, logger),
+      azure_data_lake_storage_(std::move(data_lake_storage_client)) {
+  }
+
+  bool setCommonParameters(storage::AzureDataLakeStorageParameters& params, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  storage::AzureStorageCredentials credentials_;
+  storage::AzureDataLakeStorage azure_data_lake_storage_;
+};
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.cpp b/extensions/azure/processors/AzureStorageProcessorBase.cpp
index 63b230e..a5a2426 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureStorageProcessorBase.cpp
@@ -33,13 +33,13 @@ const core::Property AzureStorageProcessorBase::AzureStorageCredentialsService(
     ->build());
 
 std::tuple<AzureStorageProcessorBase::GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> AzureStorageProcessorBase::getCredentialsFromControllerService(
-    const std::shared_ptr<core::ProcessContext> &context) const {
+    core::ProcessContext &context) const {
   std::string service_name;
-  if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
+  if (!context.getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) {
     return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_EMPTY, std::nullopt);
   }
 
-  std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name);
+  std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name);
   if (nullptr == service) {
     logger_->log_error("Azure Storage credentials service with name: '%s' could not be found", service_name);
     return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt);
diff --git a/extensions/azure/processors/AzureStorageProcessorBase.h b/extensions/azure/processors/AzureStorageProcessorBase.h
index f87e827..a85ae7b 100644
--- a/extensions/azure/processors/AzureStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureStorageProcessorBase.h
@@ -49,7 +49,7 @@ class AzureStorageProcessorBase : public core::Processor {
     CONTROLLER_NAME_INVALID
   };
 
-  std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
+  std::tuple<GetCredentialsFromControllerResult, std::optional<storage::AzureStorageCredentials>> getCredentialsFromControllerService(core::ProcessContext &context) const;
 
   std::shared_ptr<core::logging::Logger> logger_;
 };
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..6146a31
--- /dev/null
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
@@ -0,0 +1,85 @@
+/**
+ * @file DeleteAzureDataLakeStorage.cpp
+ * DeleteAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Relationship DeleteAzureDataLakeStorage::Success("success", "If file deletion from Azure storage succeeds the flowfile is transferred to this relationship");
+const core::Relationship DeleteAzureDataLakeStorage::Failure("failure", "If file deletion from Azure storage fails the flowfile is transferred to this relationship");
+
+void DeleteAzureDataLakeStorage::initialize() {
+  // Set the supported properties
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    FileName
+  });
+
+  // Set the supported relationships
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
+}
+
+std::optional<storage::DeleteAzureDataLakeStorageParameters> DeleteAzureDataLakeStorage::buildDeleteParameters(
+    core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  storage::DeleteAzureDataLakeStorageParameters params;
+  if (!setCommonParameters(params, context, flow_file)) {
+    return std::nullopt;
+  }
+
+  return params;
+}
+
+void DeleteAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+  logger_->log_trace("DeleteAzureDataLakeStorage onTrigger");
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  const auto params = buildDeleteParameters(*context, flow_file);
+  if (!params) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto result = azure_data_lake_storage_.deleteFile(*params);
+  if (!result) {
+    logger_->log_error("Failed to delete file '%s' to Azure Data Lake storage", params->filename);
+    session->transfer(flow_file, Failure);
+  } else {
+    logger_->log_debug("Successfully deleted file '%s' of filesystem '%s' on Azure Data Lake storage", params->filename, params->file_system_name);
+    session->transfer(flow_file, Success);
+  }
+}
+
+REGISTER_RESOURCE(DeleteAzureDataLakeStorage, "Deletes the provided file from Azure Data Lake Storage");
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
new file mode 100644
index 0000000..cb26a2f
--- /dev/null
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -0,0 +1,67 @@
+/**
+ * @file DeleteAzureDataLakeStorage.h
+ * DeleteAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+template<typename AzureDataLakeStorageProcessorBase>
+class AzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
+ public:
+  // Supported Relationships
+  static const core::Relationship Failure;
+  static const core::Relationship Success;
+
+  explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
+  }
+
+  ~DeleteAzureDataLakeStorage() override = default;
+
+  void initialize() override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  friend class ::AzureDataLakeStorageTestsFixture<DeleteAzureDataLakeStorage>;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+  explicit DeleteAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
+  }
+
+  std::optional<storage::DeleteAzureDataLakeStorageParameters> buildDeleteParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index e5f66fd..7913ca8 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -201,7 +201,7 @@ std::optional<storage::PutAzureBlobStorageParameters> PutAzureBlobStorage::build
 std::optional<storage::AzureStorageCredentials> PutAzureBlobStorage::getCredentials(
     const std::shared_ptr<core::ProcessContext> &context,
     const std::shared_ptr<core::FlowFile> &flow_file) const {
-  auto [result, controller_service_creds] = getCredentialsFromControllerService(context);
+  auto [result, controller_service_creds] = getCredentialsFromControllerService(*context);
   if (controller_service_creds) {
     if (controller_service_creds->isValid()) {
       logger_->log_debug("Azure credentials read from credentials controller service!");
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index 5620737..d197d2f 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -20,30 +20,14 @@
 
 #include "PutAzureDataLakeStorage.h"
 
+#include <vector>
+
 #include "utils/ProcessorConfigUtils.h"
 #include "utils/gsl.h"
-#include "controllerservices/AzureStorageCredentialsService.h"
 #include "core/Resource.h"
 
 namespace org::apache::nifi::minifi::azure::processors {
 
-const core::Property PutAzureDataLakeStorage::FilesystemName(
-    core::PropertyBuilder::createProperty("Filesystem Name")
-      ->withDescription("Name of the Azure Storage File System. It is assumed to be already existing.")
-      ->supportsExpressionLanguage(true)
-      ->isRequired(true)
-      ->build());
-const core::Property PutAzureDataLakeStorage::DirectoryName(
-    core::PropertyBuilder::createProperty("Directory Name")
-      ->withDescription("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. "
-                        "If left empty it designates the root directory. The directory will be created if not already existing.")
-      ->supportsExpressionLanguage(true)
-      ->build());
-const core::Property PutAzureDataLakeStorage::FileName(
-    core::PropertyBuilder::createProperty("File Name")
-      ->withDescription("The filename to be uploaded. If left empty the filename attribute will be used by default.")
-      ->supportsExpressionLanguage(true)
-      ->build());
 const core::Property PutAzureDataLakeStorage::ConflictResolutionStrategy(
     core::PropertyBuilder::createProperty("Conflict Resolution Strategy")
       ->withDescription("Indicates what should happen when a file with the same name already exists in the output directory.")
@@ -71,46 +55,27 @@ void PutAzureDataLakeStorage::initialize() {
   });
 }
 
-void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  std::optional<storage::AzureStorageCredentials> credentials;
-  std::tie(std::ignore, credentials) = getCredentialsFromControllerService(context);
-  if (!credentials) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid");
-  }
-
-  if (!credentials->isValid()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid");
-  }
-
-  credentials_ = *credentials;
+void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  gsl_Expects(context && sessionFactory);
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
 
   conflict_resolution_strategy_ = FileExistsResolutionStrategy::parse(
     utils::parsePropertyWithAllowableValuesOrThrow(*context, ConflictResolutionStrategy.getName(), FileExistsResolutionStrategy::values()).c_str());
 }
 
 std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters(
-    const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+    core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
   storage::PutAzureDataLakeStorageParameters params;
-  params.credentials = credentials_;
-  params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE;
-
-  if (!context->getProperty(FilesystemName, params.file_system_name, flow_file) || params.file_system_name.empty()) {
-    logger_->log_error("Filesystem Name '%s' is invalid or empty!", params.file_system_name);
-    return std::nullopt;
-  }
-
-  context->getProperty(DirectoryName, params.directory_name, flow_file);
-
-  context->getProperty(FileName, params.filename, flow_file);
-  if (params.filename.empty() && (!flow_file->getAttribute("filename", params.filename) || params.filename.empty())) {
-    logger_->log_error("No File Name is set and default object key 'filename' attribute could not be found!");
+  if (!setCommonParameters(params, context, flow_file)) {
     return std::nullopt;
   }
+  params.replace_file = conflict_resolution_strategy_ == FileExistsResolutionStrategy::REPLACE_FILE;
 
   return params;
 }
 
 void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
   logger_->log_trace("PutAzureDataLakeStorage onTrigger");
   std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
@@ -118,7 +83,7 @@ void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessConte
     return;
   }
 
-  const auto params = buildUploadParameters(context, flow_file);
+  const auto params = buildUploadParameters(*context, flow_file);
   if (!params) {
     session->transfer(flow_file, Failure);
     return;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 67ae6c9..a268711 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -20,30 +20,23 @@
 
 #pragma once
 
-#include <utility>
 #include <string>
+#include <utility>
 #include <memory>
-#include <optional>
-#include <vector>
 
-#include "core/Property.h"
-#include "core/logging/Logger.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "storage/AzureDataLakeStorage.h"
+#include "AzureDataLakeStorageProcessorBase.h"
+
 #include "utils/Enum.h"
 #include "utils/Export.h"
-#include "AzureStorageProcessorBase.h"
 
-class PutAzureDataLakeStorageTestsFixture;
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture;
 
 namespace org::apache::nifi::minifi::azure::processors {
 
-class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
+class PutAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase {
  public:
   // Supported Properties
-  EXTENSIONAPI static const core::Property FilesystemName;
-  EXTENSIONAPI static const core::Property DirectoryName;
-  EXTENSIONAPI static const core::Property FileName;
   EXTENSIONAPI static const core::Property ConflictResolutionStrategy;
 
   // Supported Relationships
@@ -65,7 +58,7 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
  private:
-  friend class ::PutAzureDataLakeStorageTestsFixture;
+  friend class ::AzureDataLakeStorageTestsFixture<PutAzureDataLakeStorage>;
 
   class ReadCallback : public InputStreamCallback {
    public:
@@ -93,15 +86,12 @@ class PutAzureDataLakeStorage final : public AzureStorageProcessorBase {
   }
 
   explicit PutAzureDataLakeStorage(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
-    : AzureStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger()),
-      azure_data_lake_storage_(std::move(data_lake_storage_client)) {
+    : AzureDataLakeStorageProcessorBase(name, uuid, core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(), std::move(data_lake_storage_client)) {
   }
 
-  std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+  std::optional<storage::PutAzureDataLakeStorageParameters> buildUploadParameters(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
 
-  storage::AzureStorageCredentials credentials_;
   FileExistsResolutionStrategy conflict_resolution_strategy_;
-  storage::AzureDataLakeStorage azure_data_lake_storage_;
 };
 
 }  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 5562933..2be5292 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -52,4 +52,13 @@ UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataL
   }
 }
 
+bool AzureDataLakeStorage::deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params) {
+  try {
+    return data_lake_storage_client_->deleteFile(params);
+  } catch (const std::exception& ex) {
+    logger_->log_error("An exception occurred while deleting '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
+    return false;
+  }
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h
index a1ec8f6..8dd9b8a 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -47,6 +47,7 @@ class AzureDataLakeStorage {
   explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client = nullptr);
 
   storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer);
+  bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
 
  private:
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e56b967..7a54a69 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -52,7 +52,7 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia
   credentials_ = credentials;
 }
 
-Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const PutAzureDataLakeStorageParameters& params) {
+Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& params) {
   resetClientIfNeeded(params.credentials, params.file_system_name);
 
   auto directory_client = client_->GetDirectoryClient(params.directory_name);
@@ -74,4 +74,10 @@ std::string AzureDataLakeStorageClient::uploadFile(const PutAzureDataLakeStorage
   return file_client.GetUrl();
 }
 
+bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStorageParameters& params) {
+  auto file_client = getFileClient(params);
+  auto result = file_client.Delete();
+  return result.Value.Deleted;
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 3f13fe6..e7d933d 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -49,9 +49,16 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
    */
   std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override;
 
+  /**
+   * Deletes a file on the Azure Data Lake Storage
+   * @param params Parameters required for connecting and file access on Azure
+   * @return True if file was deleted, false otherwise
+   */
+  bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) override;
+
  private:
   void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name);
-  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const PutAzureDataLakeStorageParameters& params);
+  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageParameters& params);
 
   AzureStorageCredentials credentials_;
   std::string file_system_name_;
diff --git a/extensions/azure/storage/DataLakeStorageClient.h b/extensions/azure/storage/DataLakeStorageClient.h
index b0f470d..4e97d72 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -28,18 +28,24 @@
 
 namespace org::apache::nifi::minifi::azure::storage {
 
-struct PutAzureDataLakeStorageParameters {
+struct AzureDataLakeStorageParameters {
   AzureStorageCredentials credentials;
   std::string file_system_name;
   std::string directory_name;
   std::string filename;
+};
+
+struct PutAzureDataLakeStorageParameters : public AzureDataLakeStorageParameters {
   bool replace_file = false;
 };
 
+using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
+
 class DataLakeStorageClient {
  public:
   virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
   virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) = 0;
+  virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) = 0;
   virtual ~DataLakeStorageClient() = default;
 };
 
diff --git a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
index 5dd99c9..16e2b47 100644
--- a/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GenerateFlowFileTests.cpp
@@ -58,8 +58,7 @@ TEST_CASE("GenerateFlowFileTest", "[generateflowfiletest]") {
 
   auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
     std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
-    std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
-    file_contents.push_back(file_content);
+    file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
     return true;
   };
 
@@ -188,8 +187,7 @@ TEST_CASE("GenerateFlowFileCustomTextTest", "[generateflowfiletest]") {
 
   auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
     std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
-    std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
-    file_contents.push_back(file_content);
+    file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
     return true;
   };
 
@@ -226,8 +224,7 @@ TEST_CASE("GenerateFlowFileCustomTextEmptyTest", "[generateflowfiletest]") {
 
   auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
     std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
-    std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
-    file_contents.push_back(file_content);
+    file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
     return true;
   };
 
diff --git a/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
new file mode 100644
index 0000000..6878e71
--- /dev/null
+++ b/libminifi/test/azure-tests/AzureDataLakeStorageTestsFixture.h
@@ -0,0 +1,122 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <vector>
+#include <memory>
+#include <string>
+
+#include "MockDataLakeStorageClient.h"
+#include "../TestBase.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/Processor.h"
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "utils/file/FileUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+const std::string FILESYSTEM_NAME = "testfilesystem";
+const std::string DIRECTORY_NAME = "testdir";
+const std::string FILE_NAME = "testfile.txt";
+const std::string CONNECTION_STRING = "test-connectionstring";
+const std::string TEST_DATA = "data123";
+const std::string GETFILE_FILE_NAME = "input_data.log";
+
+template<typename AzureDataLakeStorageProcessor>
+class AzureDataLakeStorageTestsFixture {
+ public:
+  AzureDataLakeStorageTestsFixture() {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<minifi::processors::GetFile>();
+    LogTestController::getInstance().setTrace<minifi::processors::PutFile>();
+    LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();
+    LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<AzureDataLakeStorageProcessor>();
+
+    // Build MiNiFi processing graph
+    plan_ = test_controller_.createPlan();
+    auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
+    mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
+    azure_data_lake_storage_ = std::shared_ptr<AzureDataLakeStorageProcessor>(
+      new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client)));
+    auto input_dir = test_controller_.createTempDirectory();
+    utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA);
+
+    get_file_ = plan_->addProcessor("GetFile", "GetFile");
+    plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir);
+    plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false");
+
+    update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} },  true);
+    plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
+    auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
+    logattribute->setAutoTerminatedRelationships({{"success", "d"}});
+
+    putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
+    plan_->addConnection(azure_data_lake_storage_, {"failure", "d"}, putfile_);
+    putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+    output_dir_ = test_controller_.createTempDirectory();
+    plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
+
+    azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
+    setDefaultProperties();
+  }
+
+  std::vector<std::string> getFailedFlowFileContents() {
+    std::vector<std::string> file_contents;
+
+    auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
+      std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+      file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
+      return true;
+    };
+
+    utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
+    return file_contents;
+  }
+
+  void setDefaultProperties() {
+    plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService");
+    plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true);
+    plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::FilesystemName.getName(), "${test.filesystemname}");
+    plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true);
+    plan_->setProperty(azure_data_lake_storage_, AzureDataLakeStorageProcessor::DirectoryName.getName(), "${test.directoryname}");
+    plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+  }
+
+  virtual ~AzureDataLakeStorageTestsFixture() {
+    LogTestController::getInstance().reset();
+  }
+
+ protected:
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
+  std::shared_ptr<core::Processor> azure_data_lake_storage_;
+  std::shared_ptr<core::Processor> get_file_;
+  std::shared_ptr<core::Processor> update_attribute_;
+  std::shared_ptr<core::Processor> putfile_;
+  std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
+  std::string output_dir_;
+};
diff --git a/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..ea485f5
--- /dev/null
+++ b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageTestsFixture.h"
+#include "processors/DeleteAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace {
+
+using namespace std::chrono_literals;
+
+using DeleteAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::DeleteAzureDataLakeStorage>;
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::DeleteAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Test Azure credentials with managed identity use", "[azureDataLakeStorageParameters]") {
+  setDefaultProperties();
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "test");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  CHECK(passed_params.credentials.buildConnectionString().empty());
+  CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
+  CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
+  CHECK(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(update_attribute_, "test.filesystemname", "", true);
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto failed_flowfiles = getFailedFlowFileContents();
+  CHECK(failed_flowfiles.size() == 1);
+  CHECK(failed_flowfiles[0] == TEST_DATA);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file succeeds", "[azureDataLakeStorageDelete]") {
+  test_controller_.runSession(plan_, true);
+  REQUIRE(getFailedFlowFileContents().size() == 0);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME));
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete file fails", "[azureDataLakeStorageDelete]") {
+  mock_data_lake_storage_client_ptr_->setDeleteFailure(true);
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms));
+  auto failed_flowfiles = getFailedFlowFileContents();
+  REQUIRE(failed_flowfiles.size() == 1);
+  REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+TEST_CASE_METHOD(DeleteAzureDataLakeStorageTestsFixture, "Delete result is false", "[azureDataLakeStorageDelete]") {
+  mock_data_lake_storage_client_ptr_->setDeleteResult(false);
+  test_controller_.runSession(plan_, true);
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedDeleteParams();
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.filename == GETFILE_FILE_NAME);
+  CHECK_FALSE(LogTestController::getInstance().contains("key:filename value:", 0s, 0ms));
+  auto failed_flowfiles = getFailedFlowFileContents();
+  REQUIRE(failed_flowfiles.size() == 1);
+  REQUIRE(failed_flowfiles[0] == TEST_DATA);
+}
+
+}  // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
new file mode 100644
index 0000000..eeac4dc
--- /dev/null
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <stdexcept>
+
+#include "storage/DataLakeStorageClient.h"
+
+class MockDataLakeStorageClient : public org::apache::nifi::minifi::azure::storage::DataLakeStorageClient {
+ public:
+  const std::string PRIMARY_URI = "http://test-uri/file";
+
+  bool createFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
+    if (file_creation_error_) {
+      throw std::runtime_error("error");
+    }
+    return create_file_;
+  }
+
+  std::string uploadFile(const org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override {
+    input_data_ = std::string(buffer.begin(), buffer.end());
+    put_params_ = params;
+
+    if (upload_fails_) {
+      throw std::runtime_error("error");
+    }
+
+    return RETURNED_PRIMARY_URI;
+  }
+
+  bool deleteFile(const org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters& params) override {
+    delete_params_ = params;
+
+    if (delete_fails_) {
+      throw std::runtime_error("error");
+    }
+
+    return delete_result_;
+  }
+
+  void setFileCreation(bool create_file) {
+    create_file_ = create_file;
+  }
+
+  void setFileCreationError(bool file_creation_error) {
+    file_creation_error_ = file_creation_error;
+  }
+
+  void setUploadFailure(bool upload_fails) {
+    upload_fails_ = upload_fails;
+  }
+
+  void setDeleteFailure(bool delete_fails) {
+    delete_fails_ = delete_fails;
+  }
+
+  void setDeleteResult(bool delete_result) {
+    delete_result_ = delete_result;
+  }
+
+  org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedPutParams() const {
+    return put_params_;
+  }
+
+  org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters getPassedDeleteParams() const {
+    return delete_params_;
+  }
+
+ private:
+  const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
+  bool create_file_ = true;
+  bool file_creation_error_ = false;
+  bool upload_fails_ = false;
+  bool delete_fails_ = false;
+  bool delete_result_ = true;
+  std::string input_data_;
+  org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters put_params_;
+  org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters delete_params_;
+};
diff --git a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
index 816de5d..9c2b771 100644
--- a/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureBlobStorageTests.cpp
@@ -145,8 +145,7 @@ class PutAzureBlobStorageTestsFixture {
 
     auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
       std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
-      std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
-      file_contents.push_back(file_content);
+      file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
       return true;
     };
 
diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
index a0520d4..39463c4 100644
--- a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
@@ -16,156 +16,18 @@
  * limitations under the License.
  */
 
-#include "../TestBase.h"
-#include "utils/IntegrationTestUtils.h"
-#include "utils/TestUtils.h"
-#include "core/Processor.h"
+#include "AzureDataLakeStorageTestsFixture.h"
 #include "processors/PutAzureDataLakeStorage.h"
-#include "processors/GetFile.h"
-#include "processors/PutFile.h"
-#include "processors/LogAttribute.h"
-#include "processors/UpdateAttribute.h"
-#include "storage/DataLakeStorageClient.h"
-#include "utils/file/FileUtils.h"
 #include "controllerservices/AzureStorageCredentialsService.h"
 
-using namespace std::chrono_literals;
-
-const std::string FILESYSTEM_NAME = "testfilesystem";
-const std::string DIRECTORY_NAME = "testdir";
-const std::string FILE_NAME = "testfile.txt";
-const std::string CONNECTION_STRING = "test-connectionstring";
-const std::string TEST_DATA = "data123";
-const std::string GETFILE_FILE_NAME = "input_data.log";
-
-class MockDataLakeStorageClient : public minifi::azure::storage::DataLakeStorageClient {
- public:
-  const std::string PRIMARY_URI = "http://test-uri/file";
-
-  bool createFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& /*params*/) override {
-    if (file_creation_error_) {
-      throw std::runtime_error("error");
-    }
-    return create_file_;
-  }
-
-  std::string uploadFile(const minifi::azure::storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> buffer) override {
-    input_data_ = std::string(buffer.begin(), buffer.end());
-    params_ = params;
-
-    if (upload_fails_) {
-      throw std::runtime_error("error");
-    }
-
-    return RETURNED_PRIMARY_URI;
-  }
-
-  void setFileCreation(bool create_file) {
-    create_file_ = create_file;
-  }
-
-  void setFileCreationError(bool file_creation_error) {
-    file_creation_error_ = file_creation_error;
-  }
-
-  void setUploadFailure(bool upload_fails) {
-    upload_fails_ = upload_fails;
-  }
-
-  minifi::azure::storage::PutAzureDataLakeStorageParameters getPassedParams() const {
-    return params_;
-  }
-
- private:
-  const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
-  bool create_file_ = true;
-  bool file_creation_error_ = false;
-  bool upload_fails_ = false;
-  std::string input_data_;
-  minifi::azure::storage::PutAzureDataLakeStorageParameters params_;
-};
+namespace {
 
-class PutAzureDataLakeStorageTestsFixture {
- public:
-  PutAzureDataLakeStorageTestsFixture() {
-    LogTestController::getInstance().setDebug<TestPlan>();
-    LogTestController::getInstance().setDebug<minifi::core::Processor>();
-    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
-    LogTestController::getInstance().setTrace<minifi::processors::GetFile>();
-    LogTestController::getInstance().setTrace<minifi::processors::PutFile>();
-    LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();
-    LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<minifi::azure::processors::PutAzureDataLakeStorage>();
-
-    // Build MiNiFi processing graph
-    plan_ = test_controller_.createPlan();
-    auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
-    mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
-    put_azure_data_lake_storage_ = std::shared_ptr<minifi::azure::processors::PutAzureDataLakeStorage>(
-      new minifi::azure::processors::PutAzureDataLakeStorage("PutAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client)));
-    auto input_dir = test_controller_.createTempDirectory();
-    utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA);
-
-    get_file_ = plan_->addProcessor("GetFile", "GetFile");
-    plan_->setProperty(get_file_, minifi::processors::GetFile::Directory.getName(), input_dir);
-    plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile.getName(), "false");
-
-    update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} },  true);
-    plan_->addProcessor(put_azure_data_lake_storage_, "PutAzureDataLakeStorage", { {"success", "d"}, {"failure", "d"} }, true);
-    auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
-    logattribute->setAutoTerminatedRelationships({{"success", "d"}});
-
-    putfile_ = plan_->addProcessor("PutFile", "PutFile", { {"success", "d"} }, false);
-    plan_->addConnection(put_azure_data_lake_storage_, {"failure", "d"}, putfile_);
-    putfile_->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
-    output_dir_ = test_controller_.createTempDirectory();
-    plan_->setProperty(putfile_, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), output_dir_);
-
-    azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService");
-    setDefaultProperties();
-  }
-
-  std::vector<std::string> getFailedFlowFileContents() {
-    std::vector<std::string> file_contents;
-
-    auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
-      std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
-      std::string file_content((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
-      file_contents.push_back(file_content);
-      return true;
-    };
-
-    utils::file::FileUtils::list_dir(output_dir_, lambda, plan_->getLogger(), false);
-    return file_contents;
-  }
-
-  void setDefaultProperties() {
-    plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "AzureStorageCredentialsService");
-    plan_->setProperty(update_attribute_, "test.filesystemname", FILESYSTEM_NAME, true);
-    plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::FilesystemName.getName(), "${test.filesystemname}");
-    plan_->setProperty(update_attribute_, "test.directoryname", DIRECTORY_NAME, true);
-    plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "${test.directoryname}");
-    plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), CONNECTION_STRING);
-  }
-
-  virtual ~PutAzureDataLakeStorageTestsFixture() {
-    LogTestController::getInstance().reset();
-  }
+using namespace std::chrono_literals;
 
- protected:
-  TestController test_controller_;
-  std::shared_ptr<TestPlan> plan_;
-  MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
-  std::shared_ptr<core::Processor> put_azure_data_lake_storage_;
-  std::shared_ptr<core::Processor> get_file_;
-  std::shared_ptr<core::Processor> update_attribute_;
-  std::shared_ptr<core::Processor> putfile_;
-  std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
-  std::string output_dir_;
-};
+using PutAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::PutAzureDataLakeStorage>;
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
-  plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService.getName(), "");
   REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
   REQUIRE(getFailedFlowFileContents().size() == 0);
 }
@@ -176,7 +38,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(), "");
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
   REQUIRE(getFailedFlowFileContents().size() == 0);
 }
@@ -187,7 +49,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken.getName(), "token");
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
   REQUIRE(getFailedFlowFileContents().size() == 0);
 }
@@ -198,7 +60,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials wi
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::UseManagedIdentityCredentials.getName(), "true");
   plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName.getName(), "TEST_ACCOUNT");
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.credentials.buildConnectionString().empty());
   CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
   CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
@@ -223,7 +85,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empt
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") {
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
   CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
   CHECK(passed_params.directory_name == DIRECTORY_NAME);
@@ -263,7 +125,7 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to failure on 'f
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'ignore' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
-  plan_->setProperty(put_azure_data_lake_storage_,
+  plan_->setProperty(azure_data_lake_storage_,
     minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(),
     toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::IGNORE_REQUEST));
   mock_data_lake_storage_client_ptr_->setFileCreation(false);
@@ -275,12 +137,12 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'i
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
-  plan_->setProperty(put_azure_data_lake_storage_,
+  plan_->setProperty(azure_data_lake_storage_,
     minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy.getName(),
     toString(minifi::azure::processors::PutAzureDataLakeStorage::FileExistsResolutionStrategy::REPLACE_FILE));
   mock_data_lake_storage_client_ptr_->setFileCreation(false);
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
   CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
   CHECK(passed_params.directory_name == DIRECTORY_NAME);
@@ -296,11 +158,13 @@ TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'repl
 }
 
 TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") {
-  plan_->setProperty(put_azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "");
+  plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName.getName(), "");
   test_controller_.runSession(plan_, true);
-  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedParams();
+  auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
   CHECK(passed_params.directory_name == "");
   REQUIRE(getFailedFlowFileContents().size() == 0);
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n"));
 }
+
+}  // namespace