You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/08 19:33:55 UTC

[nifi-minifi-cpp] branch main updated (e9ca206f9 -> 650f7c28b)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from e9ca206f9 MINIFICPP-1840 - Add support for MQTT 5 Closes #1432
     new f2f561ce2 MINIFICPP-1979 Use Coroutines with asio
     new d25cd5d89 MINIFICPP-1975 Volatile and persistent combination of repositories should be avoided
     new 3c44ebaff MINIFICPP-2009 CWEL should add resolved attributes with json output as well
     new 650f7c28b MINIFICPP-2043 Reset flowfile repo checkpoint during initialization

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml                           |   6 +-
 CMakeLists.txt                                     |   3 +
 cmake/BuildTests.cmake                             |   1 +
 run_shellcheck.sh => cmake/Coroutines.cmake        |  10 +-
 controller/Controller.h                            |  15 +-
 extensions/rocksdb-repos/FlowFileRepository.cpp    |   1 +
 extensions/standard-processors/CMakeLists.txt      |   3 +
 .../processors/ListenSyslog.cpp                    |   5 +-
 .../standard-processors/processors/ListenTCP.cpp   |   5 +-
 .../processors/NetworkListenerProcessor.cpp        |   8 +-
 .../processors/NetworkListenerProcessor.h          |   7 +-
 .../standard-processors/processors/PutTCP.cpp      | 484 +++++++--------------
 extensions/standard-processors/processors/PutTCP.h |  32 +-
 .../standard-processors/processors/PutUDP.cpp      |   2 +-
 .../standard-processors/tests/CMakeLists.txt       |   2 +
 .../tests/unit/ListenSyslogTests.cpp               | 204 +++++----
 .../tests/unit/ListenTcpTests.cpp                  | 187 +++++---
 .../tests/unit/ListenUDPTests.cpp                  |  53 +--
 .../standard-processors/tests/unit/PutTCPTests.cpp | 162 +++----
 .../standard-processors/tests/unit/PutUDPTests.cpp |  13 +-
 .../windows-event-log/ConsumeWindowsEventLog.cpp   |  54 ++-
 .../windows-event-log/ConsumeWindowsEventLog.h     |   9 +-
 .../tests/ConsumeWindowsEventLogTests.cpp          |  12 +
 libminifi/include/controllers/SSLContextService.h  |  13 +-
 libminifi/include/utils/net/AsioCoro.h             |  75 ++++
 libminifi/include/utils/net/Server.h               |  15 +-
 .../include/utils/net/SessionHandlingServer.h      |  67 ---
 libminifi/include/utils/net/Ssl.h                  |  16 +
 libminifi/include/utils/net/SslServer.h            |  65 ---
 libminifi/include/utils/net/TcpServer.h            |  38 +-
 libminifi/include/utils/net/UdpServer.h            |  12 +-
 libminifi/src/controllers/SSLContextService.cpp    |  10 +-
 libminifi/src/core/RepositoryFactory.cpp           |  32 +-
 libminifi/src/utils/net/SslServer.cpp              |  90 ----
 libminifi/src/utils/net/TcpServer.cpp              |  90 ++--
 libminifi/src/utils/net/UdpServer.cpp              |  41 +-
 libminifi/test/Catch.h                             |  46 +-
 libminifi/test/Utils.h                             |  75 ++--
 minifi_main/MiNiFiMain.cpp                         |   8 +
 39 files changed, 941 insertions(+), 1030 deletions(-)
 copy run_shellcheck.sh => cmake/Coroutines.cmake (77%)
 mode change 100755 => 100644
 create mode 100644 libminifi/include/utils/net/AsioCoro.h
 delete mode 100644 libminifi/include/utils/net/SessionHandlingServer.h
 delete mode 100644 libminifi/include/utils/net/SslServer.h
 delete mode 100644 libminifi/src/utils/net/SslServer.cpp


[nifi-minifi-cpp] 01/04: MINIFICPP-1979 Use Coroutines with asio

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f2f561ce2b4635e721b1d5b08b34bf8d450c9f6f
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Feb 8 16:26:01 2023 +0100

    MINIFICPP-1979 Use Coroutines with asio
    
    MINIFICPP-1985 Run asio related tests on random ports
    
    Closes #1457
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .github/workflows/ci.yml                           |   6 +-
 CMakeLists.txt                                     |   3 +
 cmake/BuildTests.cmake                             |   1 +
 .../CMakeLists.txt => cmake/Coroutines.cmake       |  21 +-
 extensions/standard-processors/CMakeLists.txt      |   3 +
 .../processors/ListenSyslog.cpp                    |   5 +-
 .../standard-processors/processors/ListenTCP.cpp   |   5 +-
 .../processors/NetworkListenerProcessor.cpp        |   8 +-
 .../processors/NetworkListenerProcessor.h          |   7 +-
 .../standard-processors/processors/PutTCP.cpp      | 484 +++++++--------------
 extensions/standard-processors/processors/PutTCP.h |  32 +-
 .../standard-processors/processors/PutUDP.cpp      |   2 +-
 .../standard-processors/tests/CMakeLists.txt       |   2 +
 .../tests/unit/ListenSyslogTests.cpp               | 204 +++++----
 .../tests/unit/ListenTcpTests.cpp                  | 187 +++++---
 .../tests/unit/ListenUDPTests.cpp                  |  53 +--
 .../standard-processors/tests/unit/PutTCPTests.cpp | 162 +++----
 .../standard-processors/tests/unit/PutUDPTests.cpp |  13 +-
 libminifi/include/controllers/SSLContextService.h  |  13 +-
 libminifi/include/utils/net/AsioCoro.h             |  75 ++++
 libminifi/include/utils/net/Server.h               |  15 +-
 .../include/utils/net/SessionHandlingServer.h      |  67 ---
 libminifi/include/utils/net/Ssl.h                  |  16 +
 libminifi/include/utils/net/SslServer.h            |  65 ---
 libminifi/include/utils/net/TcpServer.h            |  38 +-
 libminifi/include/utils/net/UdpServer.h            |  12 +-
 libminifi/src/controllers/SSLContextService.cpp    |  10 +-
 libminifi/src/utils/net/SslServer.cpp              |  90 ----
 libminifi/src/utils/net/TcpServer.cpp              |  90 ++--
 libminifi/src/utils/net/UdpServer.cpp              |  41 +-
 libminifi/test/Catch.h                             |  46 +-
 libminifi/test/Utils.h                             |  75 ++--
 32 files changed, 861 insertions(+), 990 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c3017324e..9aa080554 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,7 +3,7 @@ on: [push, pull_request, workflow_dispatch]
 jobs:
   macos_xcode:
     name: "macos-xcode"
-    runs-on: macos-11
+    runs-on: macos-12
     timeout-minutes: 180
     env:
       CCACHE_BASEDIR: ${{ GITHUB.WORKSPACE }}
@@ -29,8 +29,8 @@ jobs:
         run: |
           echo "PATH=/usr/lib/ccache:/usr/local/opt/ccache/bin:/usr/local/opt/ccache/libexec:$PATH" >> $GITHUB_ENV
           echo -e "127.0.0.1\t$HOSTNAME" | sudo tee -a /etc/hosts > /dev/null
-          # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-11-Readme.md#xcode
-          sudo xcode-select -switch /Applications/Xcode_13.2.1.app
+          # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-12-Readme.md#xcode
+          sudo xcode-select -switch /Applications/Xcode_14.0.1.app
       - name: build
         run: |
           export PATH="/usr/local/opt/flex/bin:$PATH"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 35cf83db7..513afe7ae 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -286,6 +286,9 @@ target_include_directories(RapidJSON SYSTEM INTERFACE "${CMAKE_CURRENT_SOURCE_DI
 # cxxopts
 include(CxxOpts)
 
+include(Coroutines)
+enable_coroutines()
+
 # gsl-lite
 include(GslLite)
 
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index bf21b589f..238b1f87b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -91,6 +91,7 @@ endif()
 SET(CATCH_MAIN_LIB catch_main)
 add_library(${CATCH_MAIN_LIB} STATIC "${TEST_DIR}/CatchMain.cpp")
 target_include_directories(${CATCH_MAIN_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+target_link_libraries(${CATCH_MAIN_LIB} spdlog)  # for fmt
 
 SET(TEST_RESOURCES ${TEST_DIR}/resources)
 
diff --git a/extensions/standard-processors/CMakeLists.txt b/cmake/Coroutines.cmake
similarity index 59%
copy from extensions/standard-processors/CMakeLists.txt
copy to cmake/Coroutines.cmake
index db679c286..e027551e5 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/cmake/Coroutines.cmake
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,19 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-
-file(GLOB SOURCES  "processors/*.cpp" "controllers/*.cpp" )
-
-add_library(minifi-standard-processors SHARED ${SOURCES})
-
-include(RangeV3)
-include(Asio)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
-
-register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
 
-register_extension_linter(minifi-standard-processors-linter)
+function(enable_coroutines)
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11)
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines" PARENT_SCOPE)
+    endif()
+endfunction(enable_coroutines)
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index db679c286..d9b46400a 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -28,6 +28,9 @@ include(RangeV3)
 include(Asio)
 target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
 
+include(Coroutines)
+enable_coroutines()
+
 register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
 
 register_extension_linter(minifi-standard-processors-linter)
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 41518a1d1..ecd6b7477 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -22,6 +22,7 @@
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "controllers/SSLContextService.h"
+#include "utils/net/Ssl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -67,8 +68,8 @@ const core::Property ListenSyslog::SSLContextService(
 const core::Property ListenSyslog::ClientAuth(
     core::PropertyBuilder::createProperty("Client Auth")
       ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+      ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+      ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
       ->build());
 
 const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. "
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
index 94c2d9884..2b600677e 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -20,6 +20,7 @@
 #include "core/PropertyBuilder.h"
 #include "controllers/SSLContextService.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "utils/net/Ssl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -54,8 +55,8 @@ const core::Property ListenTCP::SSLContextService(
 const core::Property ListenTCP::ClientAuth(
     core::PropertyBuilder::createProperty("Client Auth")
       ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+      ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+      ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
       ->build());
 
 const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index 21b74f1ef..ebe884c1c 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex
   auto options = readServerOptions(context);
 
   std::string ssl_value;
+  std::optional<utils::net::SslServerOptions> ssl_options;
   if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
     auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
-    server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
-  } else {
-    server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
+    auto client_auth = utils::parseEnumProperty<utils::net::ClientAuthOption>(context, client_auth_property);
+    ssl_options.emplace(std::move(*ssl_data), client_auth);
   }
+  server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_, ssl_options);
 
   startServer(options, utils::net::IpProtocol::TCP);
 }
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index 1799a3fcb..447a59322 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -26,7 +26,6 @@
 #include "core/ProcessSession.h"
 #include "core/Property.h"
 #include "utils/net/Server.h"
-#include "utils/net/SslServer.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -51,6 +50,12 @@ class NetworkListenerProcessor : public core::Processor {
     stopServer();
   }
 
+  uint16_t getPort() {
+    if (server_)
+      return server_->getPort();
+    return 0;
+  }
+
  protected:
   void startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property);
   void startUdpServer(const core::ProcessContext& context);
diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp
index 0b992ebf6..c5c2d7717 100644
--- a/extensions/standard-processors/processors/PutTCP.cpp
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -16,30 +16,29 @@
  */
 #include "PutTCP.h"
 
-#include <algorithm>
 #include <utility>
+#include <tuple>
 
 #include "range/v3/range/conversion.hpp"
 
 #include "utils/gsl.h"
-#include "utils/expected.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "core/logging/Logger.h"
-#include "controllers/SSLContextService.h"
 
-#include "asio/ssl.hpp"
-#include "asio/ip/tcp.hpp"
-#include "asio/write.hpp"
-#include "asio/high_resolution_timer.hpp"
+#include "utils/net/AsioCoro.h"
 
 using asio::ip::tcp;
-using TcpSocket = asio::ip::tcp::socket;
-using SslSocket = asio::ssl::stream<tcp::socket>;
 
 using namespace std::literals::chrono_literals;
+using std::chrono::steady_clock;
+using org::apache::nifi::minifi::utils::net::use_nothrow_awaitable;
+using org::apache::nifi::minifi::utils::net::HandshakeType;
+using org::apache::nifi::minifi::utils::net::TcpSocket;
+using org::apache::nifi::minifi::utils::net::SslSocket;
+using org::apache::nifi::minifi::utils::net::asyncOperationWithTimeout;
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -114,6 +113,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
+  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
+  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+  ssl_context.load_verify_file(ssl_context_service.getCACertificate().string());
+  ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  if (const auto& cert_file = ssl_context_service.getCertificateFile(); !cert_file.empty())
+    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
+  if (const auto& private_key_file = ssl_context_service.getPrivateKeyFile(); !private_key_file.empty())
+    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
+  ssl_context.set_password_callback([password = ssl_context_service.getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
+  return ssl_context;
+}
+}  // namespace
+
 void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
   gsl_Expects(context);
 
@@ -130,29 +144,31 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
-        logger_->log_error("%s is not a SSL Context Service", context_name);
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service");
+      }
     } else {
-      logger_->log_error("Invalid controller service: %s", context_name);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name);
     }
   }
 
   delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
 
-  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
-    connections_.reset();
-  else
-    connections_.emplace();
-
   if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
     max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
   else
@@ -160,6 +176,16 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
@@ -167,332 +193,131 @@ class ConnectionHandler : public ConnectionHandlerBase {
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    asio::ssl::context* ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+      const std::vector<std::byte>& delimiter,
+      asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const { return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  asio::ssl::context* ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : endpoints) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate().string());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+    const std::vector<std::byte>& delimiter,
+    asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);
+    if (io::isError(num_read))
+      co_return std::make_error_code(std::errc::io_error);
+    auto [write_error, bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(data_chunk, num_read), use_nothrow_awaitable), timeout_duration_);
+    if (write_error)
+      co_return write_error;
+    logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
+  }
+  auto [delimiter_write_error, delimiter_bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(delimiter), use_nothrow_awaitable), timeout_duration_);
+  if (delimiter_write_error)
+    co_return delimiter_write_error;
+  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", delimiter_bytes_written);
 
-template<class SocketType>
-nonstd::expected<tcp::resolver::results_type, std::error_code> ConnectionHandler<SocketType>::resolveHostname() {
-  tcp::resolver resolver(io_context_);
-  std::error_code error_code;
-  auto resolved_query = resolver.resolve(connection_id_.getHostname(), connection_id_.getPort(), error_code);
-  if (error_code)
-    return nonstd::make_unexpected(error_code);
-  return resolved_query;
+  last_used_ = steady_clock::now();
+  co_return std::error_code();
 }
 }  // namespace
 
@@ -517,19 +342,13 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return;
   }
 
-  auto flow_file_content_stream = session->getFlowFileContentStream(flow_file);
-  if (!flow_file_content_stream) {
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
   auto connection_id = detail::ConnectionId(std::move(hostname), std::move(port));
   std::shared_ptr<ConnectionHandlerBase> handler;
   if (!connections_ || !connections_->contains(connection_id)) {
-    if (ssl_context_service_)
-      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, ssl_context_service_);
+    if (ssl_context_)
+      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
     else
-      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, nullptr);
+      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
     if (connections_)
       (*connections_)[connection_id] = handler;
   } else {
@@ -538,7 +357,7 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
 
   gsl_Expects(handler);
 
-  processFlowFile(handler, flow_file_content_stream, *session, flow_file);
+  processFlowFile(handler, *session, flow_file);
 }
 
 void PutTCP::removeExpiredConnections() {
@@ -550,30 +369,43 @@ void PutTCP::removeExpiredConnections() {
   }
 }
 
+std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+    const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
+  std::error_code operation_error;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+      connection_handler->sendStreamWithDelimiter(flow_file_content_stream, delimiter_, io_context_),
+      [&operation_error](const std::exception_ptr&, std::error_code error_code) {
+        operation_error = error_code;
+      });
+  io_context_.run();
+  return operation_error;
+}
+
 void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
-                             const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                             core::ProcessSession& session,
-                             const std::shared_ptr<core::FlowFile>& flow_file) {
-  auto result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  auto flow_file_content_stream = session.getFlowFileContentStream(flow_file);
+  if (!flow_file_content_stream) {
+    session.transfer(flow_file, Failure);
+    return;
+  }
 
-  if (!result && connection_handler->hasBeenUsed()) {
-    logger_->log_warn("%s with reused connection, retrying...", result.error().message());
+  std::error_code operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
+
+  if (operation_error && connection_handler->hasBeenUsed()) {
+    logger_->log_warn("%s with reused connection, retrying...", operation_error.message());
     connection_handler->reset();
-    result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+    operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
   }
 
-  const auto transfer_to_success = [&session, &flow_file]() -> void {
-    session.transfer(flow_file, Success);
-  };
-
-  const auto transfer_to_failure = [&session, &flow_file, &logger = logger_, &connection_handler](std::error_code ec) -> void {
-    gsl_Expects(ec);
+  if (operation_error) {
     connection_handler->reset();
-    logger->log_error("%s", ec.message());
+    logger_->log_error("%s", operation_error.message());
     session.transfer(flow_file, Failure);
-  };
-
-  result | utils::map(transfer_to_success) | utils::orElse(transfer_to_failure);
+  } else {
+    session.transfer(flow_file, Success);
+  }
 }
 
 REGISTER_RESOURCE(PutTCP, Processor);
diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h
index 1f6f7fb58..58ae28f94 100644
--- a/extensions/standard-processors/processors/PutTCP.h
+++ b/extensions/standard-processors/processors/PutTCP.h
@@ -32,6 +32,10 @@
 #include "utils/expected.h"
 #include "utils/StringUtils.h"  // for string <=> on libc++
 
+#include <asio/io_context.hpp>
+#include <asio/awaitable.hpp>
+#include <asio/ssl/context.hpp>
+
 namespace org::apache::nifi::minifi::processors::detail {
 
 class ConnectionId {
@@ -50,12 +54,12 @@ class ConnectionId {
 }  // namespace org::apache::nifi::minifi::processors::detail
 
 namespace std {
-template <>
+template<>
 struct hash<org::apache::nifi::minifi::processors::detail::ConnectionId> {
   size_t operator()(const org::apache::nifi::minifi::processors::detail::ConnectionId& connection_id) const {
     return org::apache::nifi::minifi::utils::hash_combine(
         std::hash<std::string_view>{}(connection_id.getHostname()),
-        std::hash <std::string_view>{}(connection_id.getPort()));
+        std::hash<std::string_view>{}(connection_id.getPort()));
   }
 };
 }  // namespace std
@@ -64,16 +68,19 @@ namespace org::apache::nifi::minifi::processors {
 class ConnectionHandlerBase {
  public:
   virtual ~ConnectionHandlerBase() = default;
+  virtual void reset() = 0;
 
   [[nodiscard]] virtual bool hasBeenUsed() const = 0;
   [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) const = 0;
-  virtual nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) = 0;
-  virtual void reset() = 0;
+  [[nodiscard]] virtual asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+      const std::vector<std::byte>& delimiter,
+      asio::io_context& io_context) = 0;
 };
 
 class PutTCP final : public core::Processor {
  public:
-  EXTENSIONAPI static constexpr const char* Description = "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+  EXTENSIONAPI static constexpr const char* Description =
+      "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
       "By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, "
       "an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. "
       "An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection "
@@ -107,22 +114,25 @@ class PutTCP final : public core::Processor {
 
   void initialize() final;
   void notifyStop() final;
-  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) final;
   void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
 
  private:
   void removeExpiredConnections();
   void processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
-                       const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                       core::ProcessSession& session,
-                       const std::shared_ptr<core::FlowFile>& flow_file);
+      core::ProcessSession& session,
+      const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::error_code sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+      const std::shared_ptr<io::InputStream>& flow_file_content_stream);
 
   std::vector<std::byte> delimiter_;
+  asio::io_context io_context_;
   std::optional<std::unordered_map<detail::ConnectionId, std::shared_ptr<ConnectionHandlerBase>>> connections_;
   std::optional<std::chrono::milliseconds> idle_connection_expiration_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
-  std::chrono::milliseconds timeout_ = std::chrono::seconds(15);
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::chrono::milliseconds timeout_duration_ = std::chrono::seconds(15);
+  std::optional<asio::ssl::context> ssl_context_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutTCP>::getLogger(uuid_);
 };
 
diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index c59a86e80..90910dcd1 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -93,7 +93,7 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
   }
 
   const auto data = session->readBuffer(flow_file);
-  if (data.status < 0) {
+  if (io::isError(data.status)) {
     session->transfer(flow_file, Failure);
     return;
   }
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index 784de1824..353b7bbca 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -17,6 +17,8 @@
 # under the License.
 #
 
+include(Coroutines)
+enable_coroutines()
 
 file(GLOB PROCESSOR_UNIT_TESTS  "unit/*.cpp")
 file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp")
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 9a4aa65a5..c63a67b22 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -29,7 +29,6 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t SYSLOG_PORT = 10255;
 constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
 
 struct ValidRFC5424Message {
@@ -249,56 +248,61 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   std::string protocol;
+  uint16_t port = 0;
 
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    protocol = "UDP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
     protocol = "UDP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
-    utils::sendUdpDatagram(invalid_syslog, endpoint);
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    protocol = "TCP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "TCP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
   }
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
 
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, protocol);
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, protocol);
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, protocol);
 }
 
 TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
@@ -306,75 +310,80 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
 
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
 
   std::string protocol;
+  uint16_t port = 0;
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    protocol = "UDP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "UDP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    std::this_thread::sleep_for(100ms);
-    utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint);
-
-    utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint);
-
-    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
-    utils::sendUdpDatagram(invalid_syslog, endpoint);
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    protocol = "TCP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "TCP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    std::this_thread::sleep_for(100ms);
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
-                                       rfc5424_doc_example_2.unparsed_,
-                                       rfc5424_doc_example_3.unparsed_,
-                                       rfc5424_doc_example_4.unparsed_}, endpoint));
-
-    REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
-                                       rfc3164_doc_example_2.unparsed_,
-                                       rfc3164_doc_example_3.unparsed_,
-                                       rfc3164_doc_example_4.unparsed_}, endpoint));
-
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
+                                          rfc5424_doc_example_2.unparsed_,
+                                          rfc5424_doc_example_3.unparsed_,
+                                          rfc5424_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
+                                          rfc3164_doc_example_2.unparsed_,
+                                          rfc3164_doc_example_3.unparsed_,
+                                          rfc3164_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
   }
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
-  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
+  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9},
+                                   {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
   REQUIRE(result.at(ListenSyslog::Success).size() == 9);
   REQUIRE(result.at(ListenSyslog::Invalid).size() == 1);
 
   std::unordered_map<std::string, core::FlowFile&> success_flow_files;
 
-  for (auto& flow_file : result.at(ListenSyslog::Success)) {
+  for (auto& flow_file: result.at(ListenSyslog::Success)) {
     success_flow_files.insert({controller.plan->getContent(flow_file), *flow_file});
   }
 
@@ -388,26 +397,25 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
   REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_3.unparsed_)));
   REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_4.unparsed_)));
 
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, port, protocol);
 
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, port, protocol);
 
   REQUIRE(success_flow_files.contains(std::string(rfc5424_logger_example_1)));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Invalid)[0]) == invalid_syslog);
 }
 
-
 TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, "0"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
   SECTION("UDP") {
@@ -429,45 +437,48 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "10"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenSyslog>();
 
+  uint16_t port = 0;
+
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
+      CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint));
+      CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint), MatchesSuccess());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
   }
@@ -480,41 +491,44 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
 }
 
 TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
-  asio::ip::tcp::endpoint endpoint;
-  SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
-  }
-  SECTION("sending through IPv6", "[IPv6]") {
-    if (utils::isIPv6Disabled())
-      return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
-  }
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
-
   SingleProcessorTestController controller{listen_syslog};
+
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  ssl_context_service->enable();
+
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
-  ssl_context_service->enable();
-  controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
+  }
+
+  CHECK_THAT(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
 
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, "TCP");
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, "TCP");
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, "TCP");
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, "TCP");
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 158e1a072..32b0c6558 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -29,48 +29,45 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10254;
-
-void check_for_attributes(core::FlowFile& flow_file) {
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("tcp.port"));
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
 }
 
 TEST_CASE("ListenTCP test multiple messages", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
-  SingleProcessorTestController controller{listen_tcp};
-  LogTestController::getInstance().setTrace<ListenTCP>();
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
 
-  controller.plan->scheduleProcessor(listen_tcp);
-  REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint));
-  REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint));
+  CHECK_THAT(utils::sendMessagesViaTCP({"test_message_1"}, endpoint), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaTCP({"another_message"}, endpoint), MatchesSuccess());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[1]) == "another_message");
 
-  check_for_attributes(*result.at(ListenTCP::Success)[0]);
-  check_for_attributes(*result.at(ListenTCP::Success)[1]);
+  check_for_attributes(*result.at(ListenTCP::Success)[0], port);
+  check_for_attributes(*result.at(ListenTCP::Success)[1], port);
 }
 
 TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
   SingleProcessorTestController controller{listen_tcp};
   LogTestController::getInstance().setTrace<ListenTCP>();
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, "0"));
   REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "100"));
 
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
@@ -79,27 +76,27 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]
 }
 
 TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
-  SingleProcessorTestController controller{listen_tcp};
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenTCP>();
 
-  controller.plan->scheduleProcessor(listen_tcp);
   for (auto i = 0; i < 100; ++i) {
-    REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint));
+    CHECK_THAT(utils::sendMessagesViaTCP({"test_message"}, endpoint), MatchesSuccess());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
@@ -113,7 +110,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkLis
 
 TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
+  uint16_t port = 0;
   SingleProcessorTestController controller{listen_tcp};
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   LogTestController::getInstance().setTrace<ListenTCP>();
@@ -122,7 +119,6 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
-  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), std::to_string(PORT)));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
   std::vector<std::string> expected_successful_messages;
@@ -131,60 +127,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
-    for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    for (const auto& message: expected_successful_messages) {
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
     }
   }
 
   SECTION("With client certificate provided") {
     SECTION("Client certificate required") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required but validated") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     minifi::utils::net::SslData ssl_data;
     ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
@@ -194,31 +194,114 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data));
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data), MatchesSuccess());
     }
   }
 
   SECTION("Required certificate not provided") {
+    ssl_context_service->enable();
+    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    CHECK_THAT(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesError());
   }
 
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, expected_successful_messages.size()}}, result, 300ms, 50ms));
   for (std::size_t i = 0; i < expected_successful_messages.size(); ++i) {
     CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[i]) == expected_successful_messages[i]);
-    check_for_attributes(*result.at(ListenTCP::Success)[i]);
+    check_for_attributes(*result.at(ListenTCP::Success)[i], port);
+  }
+}
+
+namespace {
+bool isSslMethodAvailable(asio::ssl::context::method method) {
+  try {
+    [[maybe_unused]] asio::ssl::context ctx(method);
+    return true;
+  } catch (const asio::system_error& err) {
+    if (err.code() == asio::error::invalid_argument) {
+      return false;
+    } else {
+      throw;
+    }
+  }
+}
+}  // namespace
+
+TEST_CASE("Test ListenTCP SSL/TLS compatibility", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+
+  ssl_context_service->enable();
+  uint16_t port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+  asio::ip::tcp::endpoint endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+
+  minifi::utils::net::SslData ssl_data;
+  ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
+  ssl_data.cert_loc = executable_dir / "resources" / "localhost_by_A.pem";
+  ssl_data.key_loc = executable_dir / "resources" / "localhost_by_A.pem";
+  ssl_data.key_pw = "Password12";
+
+
+  asio::ssl::context::method client_method;
+  bool expected_to_work;
+
+  SECTION("sslv2 should be disabled") {
+    client_method = asio::ssl::context::method::sslv2_client;
+    expected_to_work = false;
+  }
+
+  SECTION("sslv3 should be disabled") {
+    client_method = asio::ssl::context::method::sslv3_client;
+    expected_to_work = false;
+  }
+
+  SECTION("tlsv11 should be disabled") {
+    client_method = asio::ssl::context::method::tlsv11_client;
+    expected_to_work = false;
+  }
+
+  SECTION("tlsv12 should be enabled") {
+    client_method = asio::ssl::context::method::tlsv12_client;
+    expected_to_work = true;
+  }
+
+  SECTION("tlsv13 should be enabled") {
+    client_method = asio::ssl::context::method::tlsv13_client;
+    expected_to_work = true;
+  }
+
+  if (!isSslMethodAvailable(client_method))
+    return;
+
+  auto send_result = utils::sendMessagesViaSSL({"message"}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data, client_method);
+  if (expected_to_work) {
+    CHECK_THAT(send_result, MatchesSuccess());
+    ProcessorTriggerResult result;
+    CHECK(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
+  } else {
+    CHECK_THAT(send_result, MatchesError());
+    ProcessorTriggerResult result;
+    CHECK_FALSE(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
   }
 }
 
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
index ecce9120d..ca1c0ec25 100644
--- a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -29,49 +29,49 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10256;
-
-void check_for_attributes(core::FlowFile& flow_file) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
 }
 
 TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
   asio::ip::udp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
-  SingleProcessorTestController controller{listen_udp};
-  LogTestController::getInstance().setTrace<ListenUDP>();
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
 
   controller.plan->scheduleProcessor(listen_udp);
-  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
-  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  CHECK_THAT(utils::sendUdpDatagram({"test_message_1"}, endpoint), MatchesSuccess());
+  CHECK_THAT(utils::sendUdpDatagram({"another_message"}, endpoint), MatchesSuccess());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));
   CHECK(result.at(ListenUDP::Success).size() == 2);
   CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1");
   CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message");
 
-  check_for_attributes(*result.at(ListenUDP::Success)[0]);
-  check_for_attributes(*result.at(ListenUDP::Success)[1]);
+  check_for_attributes(*result.at(ListenUDP::Success)[0], port);
+  check_for_attributes(*result.at(ListenUDP::Success)[1], port);
 }
 
 TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") {
   const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
   SingleProcessorTestController controller{listen_udp};
   LogTestController::getInstance().setTrace<ListenUDP>();
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, "0"));
   REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "100"));
 
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
@@ -80,27 +80,28 @@ TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]
 }
 
 TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
   asio::ip::udp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
-  SingleProcessorTestController controller{listen_udp};
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenUDP>();
 
   controller.plan->scheduleProcessor(listen_udp);
   for (auto i = 0; i < 100; ++i) {
-    REQUIRE(utils::sendUdpDatagram({"test_message"}, endpoint));
+    CHECK_THAT(utils::sendUdpDatagram({"test_message"}, endpoint), MatchesSuccess());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 3396a50b4..ac44c4db8 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -27,9 +27,10 @@
 #include "controllers/SSLContextService.h"
 #include "core/ProcessSession.h"
 #include "utils/net/TcpServer.h"
-#include "utils/net/SslServer.h"
+#include "utils/net/AsioCoro.h"
 #include "utils/expected.h"
 #include "utils/StringUtils.h"
+#include "IntegrationTestUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -38,69 +39,48 @@ namespace org::apache::nifi::minifi::processors {
 using controllers::SSLContextService;
 
 namespace {
-using utils::net::TcpSession;
-using utils::net::TcpServer;
 
-using utils::net::SslSession;
-using utils::net::SslServer;
-
-class ISessionAwareServer {
+class CancellableTcpServer : public utils::net::TcpServer {
  public:
-  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
-  virtual void closeSessions() = 0;
-};
+  using utils::net::TcpServer::TcpServer;
 
-template<class SessionType>
-class SessionAwareServer : public ISessionAwareServer {
- protected:
-  size_t getNumberOfSessions() const override {
-    std::lock_guard lock_guard{mutex_};
-    return sessions_.size();
-  }
-
-  void closeSessions() override {
-    std::lock_guard lock_guard{mutex_};
-    for (const auto& session_weak : sessions_) {
-      if (auto session = session_weak.lock()) {
-        auto& socket = session->getSocket();
-        if (socket.is_open()) {
-          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
-          session->getSocket().close();
-        }
-      }
-    }
+  size_t getNumberOfSessions() const {
+    return cancellable_timers_.size();
   }
 
-  mutable std::mutex mutex_;
-  std::vector<std::weak_ptr<SessionType>> sessions_;
-};
+  void cancelEverything() {
+    for (auto& timer : cancellable_timers_)
+      io_context_.post([=]{timer->cancel();});
+  }
 
-class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
- public:
-  using TcpServer::TcpServer;
+  asio::awaitable<void> doReceive() override {
+    using asio::experimental::awaitable_operators::operator||;
 
- protected:
-  std::shared_ptr<TcpSession> createSession() override {
-    std::lock_guard lock_guard{mutex_};
-    auto session = TcpServer::createSession();
-    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
-    sessions_.emplace_back(session);
-    return session;
+    asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+    if (port_ == 0)
+      port_ = acceptor.local_endpoint().port();
+    while (true) {
+      auto [accept_error, socket] = co_await acceptor.async_accept(utils::net::use_nothrow_awaitable);
+      if (accept_error) {
+        logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+        break;
+      }
+      auto cancellable_timer = std::make_shared<asio::steady_timer>(io_context_);
+      cancellable_timers_.push_back(cancellable_timer);
+      if (ssl_data_)
+        co_spawn(io_context_, secureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+      else
+        co_spawn(io_context_, insecureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+    }
   }
-};
-
-class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
- public:
-  using SslServer::SslServer;
 
- protected:
-  std::shared_ptr<SslSession> createSession() override {
-    std::lock_guard lock_guard{mutex_};
-    auto session = SslServer::createSession();
-    logger_->log_trace("SessionAwareSslServer::createSession %p", session.get());
-    sessions_.emplace_back(session);
-    return session;
+ private:
+  static asio::awaitable<void> wait_until_cancelled(std::shared_ptr<asio::steady_timer> timer) {
+    timer->expires_at(asio::steady_timer::time_point::max());
+    co_await utils::net::async_wait(*timer);
   }
+
+  std::vector<std::shared_ptr<asio::steady_timer>> cancellable_timers_;
 };
 
 utils::net::SslData createSslDataForServer() {
@@ -129,28 +109,28 @@ class PutTCPTestFixture {
   }
 
   void stopServers() {
-    for (auto& [port, server] : listeners_) {
-      auto& listener = server.listener_;
+    for (auto& [port, server] : servers_) {
+      auto& cancellable_server = server.cancellable_server;
       auto& server_thread = server.server_thread_;
-      if (listener)
-        listener->stop();
+      if (cancellable_server)
+        cancellable_server->stop();
       if (server_thread.joinable())
         server_thread.join();
-      listener.reset();
+      cancellable_server.reset();
     }
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto cancellable_tcp_server = getServer(port)) {
+      return cancellable_tcp_server->getNumberOfSessions();
     }
     return -1;
   }
 
   void closeActiveConnections() {
-    for (auto& [port, server] : listeners_) {
-      if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
-        session_aware_listener->closeSessions();
+    for (auto& [port, server] : servers_) {
+      if (auto cancellable_tcp_server = getServer(port)) {
+        cancellable_tcp_server->cancelEverything();
       }
     }
     std::this_thread::sleep_for(200ms);
@@ -171,7 +151,7 @@ class PutTCPTestFixture {
     auto start_time = std::chrono::system_clock::now();
     utils::net::Message result;
     while (start_time + timeout > std::chrono::system_clock::now()) {
-      if (getListener(port)->tryDequeue(result))
+      if (getServer(port)->tryDequeue(result))
         return result;
       std::this_thread::sleep_for(interval);
     }
@@ -204,14 +184,17 @@ class PutTCPTestFixture {
   }
 
   uint16_t addTCPServer() {
-    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
-    listeners_[port].startTCPServer(port);
+    Server server;
+    uint16_t port = server.startTCPServer(std::nullopt);
+    servers_[port] = std::move(server);
     return port;
   }
 
   uint16_t addSSLServer() {
-    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
-    listeners_[port].startSSLServer(port);
+    auto ssl_server_options = utils::net::SslServerOptions{createSslDataForServer(), utils::net::ClientAuthOption::REQUIRED};
+    Server server;
+    uint16_t port = server.startTCPServer(ssl_server_options);
+    servers_[port] = std::move(server);
     return port;
   }
 
@@ -224,47 +207,36 @@ class PutTCPTestFixture {
   }
 
   [[nodiscard]] uint16_t getSinglePort() const {
-    gsl_Expects(listeners_.size() == 1);
-    return listeners_.begin()->first;
+    gsl_Expects(servers_.size() == 1);
+    return servers_.begin()->first;
   }
 
  private:
-  utils::net::Server* getListener(std::optional<uint16_t> port) {
+  CancellableTcpServer* getServer(std::optional<uint16_t> port) {
     if (!port)
       port = getSinglePort();
-    return listeners_.at(*port).listener_.get();
+    return servers_.at(*port).cancellable_server.get();
   }
 
   const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
   test::SingleProcessorTestController controller_{put_tcp_};
 
-  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-
   class Server {
    public:
     Server() = default;
 
-    void startTCPServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
-      server_thread_ = std::thread([this]() { listener_->run(); });
-    }
-
-    void startSSLServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
-                                                          port,
-                                                          core::logging::LoggerFactory<utils::net::Server>::getLogger(),
-                                                          createSslDataForServer(),
-                                                          utils::net::SslServer::ClientAuthOption::REQUIRED);
-      server_thread_ = std::thread([this]() { listener_->run(); });
+    uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> ssl_server_options) {
+      gsl_Expects(!cancellable_server && !server_thread_.joinable());
+      cancellable_server = std::make_unique<CancellableTcpServer>(std::nullopt, 0, core::logging::LoggerFactory<utils::net::Server>::getLogger(), std::move(ssl_server_options));
+      server_thread_ = std::thread([this]() { cancellable_server->run(); });
+      REQUIRE(utils::verifyEventHappenedInPollTime(250ms, [this] { return cancellable_server->getPort() != 0; }, 20ms));
+      return cancellable_server->getPort();
     }
 
-    std::unique_ptr<utils::net::Server> listener_;
+    std::unique_ptr<CancellableTcpServer> cancellable_server;
     std::thread server_thread_;
   };
-  std::unordered_map<uint16_t, Server> listeners_;
+  std::unordered_map<uint16_t, Server> servers_;
 };
 
 void trigger_expect_success(PutTCPTestFixture& test_fixture, const std::string_view message, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
@@ -382,7 +354,8 @@ TEST_CASE("PutTCP test invalid host", "[PutTCP]") {
   trigger_expect_failure(test_fixture, "message for invalid host");
 
   CHECK((LogTestController::getInstance().contains("Host not found", 0ms)
-      || LogTestController::getInstance().contains("No such host is known", 0ms)));
+      || LogTestController::getInstance().contains("No such host is known", 0ms)
+      || LogTestController::getInstance().contains("A connection attempt failed because the connected party did not properly respond", 0ms)));
 }
 
 TEST_CASE("PutTCP test invalid server", "[PutTCP]") {
@@ -412,7 +385,8 @@ TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
   test_fixture.setPutTCPPort(1235);
   trigger_expect_failure(test_fixture, "message for non-routable server");
 
-  CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
+  CHECK((LogTestController::getInstance().contains("No route to host", 0ms)
+      || LogTestController::getInstance().contains("Connection timed out", 0ms)
       || LogTestController::getInstance().contains("Operation timed out", 0ms)
       || LogTestController::getInstance().contains("host has failed to respond", 0ms)
       || LogTestController::getInstance().contains("No route to host", 0ms)));
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index ca63f5da5..b22ad6746 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
   {
     const char* const message = "first message: hello";
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index a9c7a8c55..300db0266 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -141,13 +141,10 @@ class SSLContextService : public core::controller::ControllerService {
 
   std::unique_ptr<SSLContext> createSSLContext();
 
-  const std::filesystem::path& getCertificateFile();
-
-  const std::string& getPassphrase();
-
-  const std::filesystem::path& getPrivateKeyFile();
-
-  const std::filesystem::path& getCACertificate();
+  const std::filesystem::path& getCertificateFile() const;
+  const std::string& getPassphrase() const;
+  const std::filesystem::path& getPrivateKeyFile() const;
+  const std::filesystem::path& getCACertificate() const;
 
   void yield() override {
   }
@@ -203,7 +200,7 @@ class SSLContextService : public core::controller::ControllerService {
  protected:
   virtual void initializeProperties();
 
-  std::mutex initialization_mutex_;
+  mutable std::mutex initialization_mutex_;
   bool initialized_;
   std::filesystem::path certificate_;
   std::filesystem::path private_key_;
diff --git a/libminifi/include/utils/net/AsioCoro.h b/libminifi/include/utils/net/AsioCoro.h
new file mode 100644
index 000000000..5c2e5268b
--- /dev/null
+++ b/libminifi/include/utils/net/AsioCoro.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <tuple>
+#include <system_error>
+#include <utility>
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/steady_timer.hpp"
+#include "asio/this_coro.hpp"
+#include "asio/use_awaitable.hpp"
+#include "asio/experimental/awaitable_operators.hpp"
+#include "asio/experimental/as_tuple.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+constexpr auto use_nothrow_awaitable = asio::experimental::as_tuple(asio::use_awaitable);
+
+using HandshakeType = asio::ssl::stream_base::handshake_type;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<asio::ip::tcp::socket>;
+
+#if defined(__GNUC__) && __GNUC__ < 11
+// [coroutines] unexpected 'warning: statement has no effect [-Wunused-value]'
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96749
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-value"
+#endif  // defined(__GNUC__) && __GNUC__ < 11
+inline asio::awaitable<void> async_wait(asio::steady_timer& timer) {
+  co_await timer.async_wait(utils::net::use_nothrow_awaitable);
+}
+#if defined(__GNUC__) && __GNUC__ < 11
+#pragma GCC diagnostic pop
+#endif  // defined(__GNUC__) && __GNUC__ < 11
+
+namespace detail {
+inline asio::awaitable<void> timeout(std::chrono::steady_clock::duration duration) {
+  asio::steady_timer timer(co_await asio::this_coro::executor);  // NOLINT
+  timer.expires_after(duration);
+  co_await async_wait(timer);
+}
+}  // namespace detail
+
+template<class... Types>
+asio::awaitable<std::tuple<std::error_code, Types...>> asyncOperationWithTimeout(asio::awaitable<std::tuple<std::error_code, Types...>>&& async_operation,
+    std::chrono::steady_clock::duration timeout_duration) {
+  using asio::experimental::awaitable_operators::operator||;
+  auto operation_result = co_await(std::move(async_operation) || detail::timeout(timeout_duration));
+  if (operation_result.index() == 1) {
+    std::tuple<std::error_code, Types...> result;
+    std::get<0>(result) = asio::error::timed_out;
+    co_return result;
+  }
+  co_return std::get<0>(operation_result);
+}
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Server.h b/libminifi/include/utils/net/Server.h
index 5a6d7622d..e84815c3b 100644
--- a/libminifi/include/utils/net/Server.h
+++ b/libminifi/include/utils/net/Server.h
@@ -26,7 +26,9 @@
 #include "core/logging/Logger.h"
 #include "asio/ts/buffer.hpp"
 #include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
+#include "asio/awaitable.hpp"
+#include "asio/co_spawn.hpp"
+#include "asio/detached.hpp"
 #include "IpProtocol.h"
 
 namespace org::apache::nifi::minifi::utils::net {
@@ -50,6 +52,7 @@ struct Message {
 class Server {
  public:
   virtual void run() {
+    asio::co_spawn(io_context_, doReceive(), asio::detached);
     io_context_.run();
   }
   virtual void reset() {
@@ -68,10 +71,16 @@ class Server {
     stop();
   }
 
+  uint16_t getPort() const {
+    return port_;
+  }
+
  protected:
-  Server(std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-      : max_queue_size_(std::move(max_queue_size)), logger_(logger) {}
+  virtual asio::awaitable<void> doReceive() = 0;
+  Server(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
+      : port_(port), max_queue_size_(max_queue_size), logger_(std::move(logger)) {}
 
+  std::atomic<uint16_t> port_;
   utils::ConcurrentQueue<Message> concurrent_queue_;
   asio::io_context io_context_;
   std::optional<size_t> max_queue_size_;
diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h
deleted file mode 100644
index 173fdcb02..000000000
--- a/libminifi/include/utils/net/SessionHandlingServer.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <utility>
-#include <memory>
-
-#include "Server.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-template<typename SessionType>
-class SessionHandlingServer : public Server {
- public:
-  SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-      : Server(max_queue_size, std::move(logger)),
-        acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) {
-  }
-
-  void run() override {
-    startAccept();
-    Server::run();
-  }
-
- protected:
-  void startAccept() {
-    auto new_session = createSession();
-    acceptor_.async_accept(new_session->getSocket(),
-                           [this, new_session](const auto& error_code) -> void {
-                             handleAccept(new_session, error_code);
-                           });
-  }
-
-  void handleAccept(const std::shared_ptr<SessionType>& session, const std::error_code& error) {
-    if (error) {
-      return;
-    }
-
-    session->start();
-    auto new_session = createSession();
-    acceptor_.async_accept(new_session->getSocket(),
-                           [this, new_session](const auto& error_code) -> void {
-                             handleAccept(new_session, error_code);
-                           });
-  }
-
-  virtual std::shared_ptr<SessionType> createSession() = 0;
-
-  asio::ip::tcp::acceptor acceptor_;
-};
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Ssl.h b/libminifi/include/utils/net/Ssl.h
index 7a32e30ac..d8ea4621a 100644
--- a/libminifi/include/utils/net/Ssl.h
+++ b/libminifi/include/utils/net/Ssl.h
@@ -23,9 +23,16 @@
 #include "core/ProcessContext.h"
 #include "core/Property.h"
 #include "core/logging/Logger.h"
+#include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+SMART_ENUM(ClientAuthOption,
+    (NONE, "NONE"),
+    (WANT, "WANT"),
+    (REQUIRED, "REQUIRED")
+)
+
 struct SslData {
   std::filesystem::path ca_loc;
   std::filesystem::path cert_loc;
@@ -37,6 +44,15 @@ struct SslData {
   }
 };
 
+struct SslServerOptions {
+  SslData cert_data;
+  ClientAuthOption client_auth_option;
+
+  SslServerOptions(SslData cert_data, ClientAuthOption client_auth_option)
+      : cert_data(cert_data),
+      client_auth_option(client_auth_option) {}
+};
+
 std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::Property& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger);
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/SslServer.h b/libminifi/include/utils/net/SslServer.h
deleted file mode 100644
index ddd04e773..000000000
--- a/libminifi/include/utils/net/SslServer.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "SessionHandlingServer.h"
-
-#include <memory>
-#include <string>
-
-#include "Ssl.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-using ssl_socket = asio::ssl::stream<asio::ip::tcp::socket>;
-
-class SslSession : public std::enable_shared_from_this<SslSession> {
- public:
-  SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
-    std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
-
-  ssl_socket::lowest_layer_type& getSocket();
-  void start();
-  void handleReadUntilNewLine(std::error_code error_code);
-
- protected:
-  utils::ConcurrentQueue<Message>& concurrent_queue_;
-  std::optional<size_t> max_queue_size_;
-  asio::basic_streambuf<std::allocator<char>> buffer_;
-  std::shared_ptr<core::logging::Logger> logger_;
-  ssl_socket socket_;
-};
-
-class SslServer : public SessionHandlingServer<SslSession> {
- public:
-  SMART_ENUM(ClientAuthOption,
-    (NONE, "NONE"),
-    (WANT, "WANT"),
-    (REQUIRED, "REQUIRED")
-  )
-
-  SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth);
-
- protected:
-  std::shared_ptr<SslSession> createSession() override;
-
-  asio::ssl::context context_;
-  SslData ssl_data_;
-};
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h
index a70b26bd9..717d674ed 100644
--- a/libminifi/include/utils/net/TcpServer.h
+++ b/libminifi/include/utils/net/TcpServer.h
@@ -16,36 +16,30 @@
  */
 #pragma once
 
-#include <optional>
+#include <utility>
 #include <memory>
 
-#include "SessionHandlingServer.h"
+#include "Server.h"
+#include "Ssl.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-class TcpSession : public std::enable_shared_from_this<TcpSession> {
+class TcpServer : public Server {
  public:
-  TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
-  asio::ip::tcp::socket& getSocket();
-  void start();
-  void handleReadUntilNewLine(std::error_code error_code);
-
- private:
-  utils::ConcurrentQueue<Message>& concurrent_queue_;
-  std::optional<size_t> max_queue_size_;
-  asio::basic_streambuf<std::allocator<char>> buffer_;
-  asio::ip::tcp::socket socket_;
-  std::shared_ptr<core::logging::Logger> logger_;
-};
-
-class TcpServer : public SessionHandlingServer<TcpSession> {
- public:
-  TcpServer(std::optional<size_t> max_queue_size,
-            uint16_t port,
-            std::shared_ptr<core::logging::Logger> logger);
+  TcpServer(std::optional<size_t> max_queue_size_, uint16_t port, std::shared_ptr<core::logging::Logger> logger, std::optional<SslServerOptions> ssl_data)
+      : Server(max_queue_size_, port, std::move(logger)),
+        ssl_data_(std::move(ssl_data)) {
+  }
 
  protected:
-  std::shared_ptr<TcpSession> createSession() override;
+  asio::awaitable<void> doReceive() override;
+
+  asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket);
+  asio::awaitable<void> secureSession(asio::ip::tcp::socket socket);
+
+  asio::awaitable<void> readLoop(auto& socket);
+
+  std::optional<SslServerOptions> ssl_data_;
 };
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/UdpServer.h b/libminifi/include/utils/net/UdpServer.h
index e9b852b00..97cfa5737 100644
--- a/libminifi/include/utils/net/UdpServer.h
+++ b/libminifi/include/utils/net/UdpServer.h
@@ -19,14 +19,12 @@
 #include <optional>
 #include <memory>
 #include <string>
+#include <asio/awaitable.hpp>
 
 #include "Server.h"
 #include "utils/MinifiConcurrentQueue.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "asio/ts/buffer.hpp"
-#include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
 
 namespace org::apache::nifi::minifi::utils::net {
 
@@ -37,13 +35,7 @@ class UdpServer : public Server {
             std::shared_ptr<core::logging::Logger> logger);
 
  private:
-  void doReceive();
-
-  asio::ip::udp::socket socket_;
-  asio::ip::udp::endpoint sender_endpoint_;
-  std::string buffer_;
-
-  static constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+  asio::awaitable<void> doReceive() override;
 };
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 165805d86..635e95144 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -31,13 +31,11 @@
 #include <fstream>
 #include <memory>
 #include <string>
-#include <set>
 
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "io/validation.h"
 #include "properties/Configure.h"
-#include "utils/gsl.h"
 #include "utils/tls/CertificateUtils.h"
 #include "utils/tls/TLSUtils.h"
 #include "utils/tls/DistinguishedName.h"
@@ -407,22 +405,22 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
 #endif
 }
 
-const std::filesystem::path &SSLContextService::getCertificateFile() {
+const std::filesystem::path &SSLContextService::getCertificateFile() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return certificate_;
 }
 
-const std::string &SSLContextService::getPassphrase() {
+const std::string &SSLContextService::getPassphrase() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return passphrase_;
 }
 
-const std::filesystem::path &SSLContextService::getPrivateKeyFile() {
+const std::filesystem::path &SSLContextService::getPrivateKeyFile() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return private_key_;
 }
 
-const std::filesystem::path &SSLContextService::getCACertificate() {
+const std::filesystem::path &SSLContextService::getCACertificate() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return ca_certificate_;
 }
diff --git a/libminifi/src/utils/net/SslServer.cpp b/libminifi/src/utils/net/SslServer.cpp
deleted file mode 100644
index 7aaf4845a..000000000
--- a/libminifi/src/utils/net/SslServer.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "utils/net/SslServer.h"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-SslSession::SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
-    std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    logger_(std::move(logger)),
-    socket_(io_context, context) {
-}
-
-ssl_socket::lowest_layer_type& SslSession::getSocket() {
-  return socket_.lowest_layer();
-}
-
-void SslSession::start() {
-  socket_.async_handshake(asio::ssl::stream_base::server,
-    [this, self = shared_from_this()](const std::error_code& error_code) {
-      if (error_code) {
-        logger_->log_error("Error occured during SSL handshake: (%d) %s", error_code.value(), error_code.message());
-        return;
-      }
-      asio::async_read_until(socket_,
-                             buffer_,
-                             '\n',
-                             [self](const auto& error_code, size_t) -> void {
-                               self->handleReadUntilNewLine(error_code);
-                             });
-    });
-}
-
-void SslSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, getSocket().remote_endpoint().address(), getSocket().local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
-}
-
-SslServer::SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth)
-    : SessionHandlingServer<SslSession>(max_queue_size, port, std::move(logger)),
-      context_(asio::ssl::context::sslv23),
-      ssl_data_(std::move(ssl_data)) {
-    context_.set_options(
-        asio::ssl::context::default_workarounds
-        | asio::ssl::context::no_sslv2
-        | asio::ssl::context::single_dh_use);
-    context_.set_password_callback([this](std::size_t&, asio::ssl::context_base::password_purpose&) { return ssl_data_.key_pw; });
-    context_.use_certificate_file(ssl_data_.cert_loc.string(), asio::ssl::context::pem);
-    context_.use_private_key_file(ssl_data_.key_loc.string(), asio::ssl::context::pem);
-    context_.load_verify_file(ssl_data_.ca_loc.string());
-    if (client_auth == ClientAuthOption::REQUIRED) {
-      context_.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
-    } else if (client_auth == ClientAuthOption::WANT) {
-      context_.set_verify_mode(asio::ssl::verify_peer);
-    }
-}
-
-std::shared_ptr<SslSession> SslServer::createSession() {
-  return std::make_shared<SslSession>(io_context_, context_, concurrent_queue_, max_queue_size_, logger_);
-}
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp
index 742ef7f25..fcd02d1e4 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -15,53 +15,73 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. TCP message ignored.");
+    read_message.erase(0, bytes_read);
+  }
 }
 
-void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) {
+  co_return co_await readLoop(socket);  // NOLINT
 }
 
-TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-    : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
+namespace {
+asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
+  asio::ssl::context ssl_context(asio::ssl::context::tls_server);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+  ssl_context.set_password_callback([key_pw = ssl_data.cert_data.key_pw](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; });
+  ssl_context.use_certificate_file(ssl_data.cert_data.cert_loc.string(), asio::ssl::context::pem);
+  ssl_context.use_private_key_file(ssl_data.cert_data.key_loc.string(), asio::ssl::context::pem);
+  ssl_context.load_verify_file(ssl_data.cert_data.ca_loc.string());
+  if (ssl_data.client_auth_option == ClientAuthOption::REQUIRED) {
+    ssl_context.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
+  } else if (ssl_data.client_auth_option == ClientAuthOption::WANT) {
+    ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  }
+  return ssl_context;
 }
+}  // namespace
 
-std::shared_ptr<TcpSession> TcpServer::createSession() {
-  return std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_);
+asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket) {
+  gsl_Expects(ssl_data_);
+  auto ssl_context = setupSslContext(*ssl_data_);
+  SslSocket ssl_socket(std::move(socket), ssl_context);
+  auto [handshake_error] = co_await ssl_socket.async_handshake(HandshakeType::server, use_nothrow_awaitable);
+  if (handshake_error) {
+    core::logging::LOG_WARN(logger_) << "Handshake with " << ssl_socket.lowest_layer().remote_endpoint() << " failed due to " << handshake_error.message();
+    co_return;
+  }
+  co_return co_await readLoop(ssl_socket);  // NOLINT
 }
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
index 6137f2ddf..e6179db13 100644
--- a/libminifi/src/utils/net/UdpServer.cpp
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -15,32 +15,39 @@
  * limitations under the License.
  */
 #include "utils/net/UdpServer.h"
+#include "asio/use_awaitable.hpp"
+#include "asio/detached.hpp"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+
 UdpServer::UdpServer(std::optional<size_t> max_queue_size,
                      uint16_t port,
                      std::shared_ptr<core::logging::Logger> logger)
-    : Server(max_queue_size, std::move(logger)),
-      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
-  doReceive();
+    : Server(max_queue_size, port, std::move(logger)) {
 }
 
+asio::awaitable<void> UdpServer::doReceive() {
+  asio::ip::udp::socket socket(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port_));
+  if (port_ == 0)
+    port_ = socket.local_endpoint().port();
+  while (true) {
+    std::string buffer = std::string(MAX_UDP_PACKET_SIZE, {});
+    asio::ip::udp::endpoint sender_endpoint;
 
-void UdpServer::doReceive() {
-  buffer_.resize(MAX_UDP_PACKET_SIZE);
-  socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
-                             sender_endpoint_,
-                             [this](std::error_code ec, std::size_t bytes_received) {
-                               if (!ec && bytes_received > 0) {
-                                 buffer_.resize(bytes_received);
-                                 if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-                                   concurrent_queue_.enqueue(utils::net::Message(std::move(buffer_), IpProtocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
-                                 else
-                                   logger_->log_warn("Queue is full. UDP message ignored.");
-                               }
-                               doReceive();
-                             });
+    auto [receive_error, bytes_received] = co_await socket.async_receive_from(asio::buffer(buffer, MAX_UDP_PACKET_SIZE), sender_endpoint, utils::net::use_nothrow_awaitable);
+    if (receive_error) {
+      logger_->log_warn("Error during receive: %s", receive_error.message());
+      continue;
+    }
+    buffer.resize(bytes_received);
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. UDP message ignored.");
+  }
 }
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/Catch.h b/libminifi/test/Catch.h
index 925f9dc29..755496feb 100644
--- a/libminifi/test/Catch.h
+++ b/libminifi/test/Catch.h
@@ -20,15 +20,15 @@
 #define CATCH_CONFIG_FAST_COMPILE
 #include <optional>
 #include <string>
+#include "spdlog/spdlog.h"
 #include "catch.hpp"
 
-
 namespace Catch {
 template<typename T>
 struct StringMaker<std::optional<T>> {
   static std::string convert(const std::optional<T>& val) {
     if (val) {
-      return "std::optional(" + StringMaker<T>::convert(val.value()) + ")";
+      return fmt::format("std::optional({})", StringMaker<T>::convert(val.value()));
     }
     return "std::nullopt";
   }
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(category:{}, value:{}, message:{})", error_code.category().name(), error_code.value(), error_code.message());
+  }
+};
 }  // namespace Catch
+
+namespace org::apache::nifi::minifi::test {
+struct MatchesSuccess : Catch::MatcherBase<std::error_code> {
+  MatchesSuccess() = default;
+
+  bool match(const std::error_code& err) const override {
+    return err.value() == 0;
+  }
+
+  std::string describe() const override {
+    return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+};
+
+struct MatchesError : Catch::MatcherBase<std::error_code> {
+  explicit MatchesError(std::optional<std::error_code> expected_error = std::nullopt)
+      : Catch::MatcherBase<std::error_code>(),
+        expected_error_(expected_error) {
+  }
+
+  bool match(const std::error_code& err) const override {
+    if (expected_error_)
+      return err == *expected_error_;
+    return err.value() != 0;
+  }
+
+  std::string describe() const override {
+    if (expected_error_)
+      return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(*expected_error_));
+    return fmt::format("!= {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+ private:
+  std::optional<std::error_code> expected_error_;
+};
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index bb06b21b7..4f37b399e 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -19,11 +19,13 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <memory>
 
 #include "rapidjson/document.h"
 #include "asio.hpp"
 #include "asio/ssl.hpp"
 #include "net/Ssl.h"
+#include "utils/IntegrationTestUtils.h"
 
 using namespace std::chrono_literals;
 
@@ -111,36 +113,39 @@ bool countLogOccurrencesUntil(const std::string& pattern,
   return false;
 }
 
-bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
+std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::tcp::socket socket(io_context);
-  socket.connect(remote_endpoint);
   std::error_code err;
+  socket.connect(remote_endpoint, err);
+  if (err)
+    return err;
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
-    if (err) {
-      return false;
-    }
+    if (err)
+      return err;
   }
-  return true;
+  return std::error_code();
 }
 
-bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::udp::socket socket(io_context);
-  socket.open(remote_endpoint.protocol());
   std::error_code err;
+  socket.open(remote_endpoint.protocol(), err);
+  if (err)
+    return err;
   socket.send_to(content, remote_endpoint, 0, err);
-  return !err;
+  return err;
 }
 
-bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
   return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint);
 }
 
-bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
   return sendUdpDatagram(asio::buffer(content), remote_endpoint);
 }
 
@@ -166,11 +171,12 @@ struct FlowFileQueueTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
 
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
-                        const asio::ip::tcp::endpoint& remote_endpoint,
-                        const std::filesystem::path& ca_cert_path,
-                        const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
-  asio::ssl::context ctx(asio::ssl::context::sslv23);
+std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+    const asio::ip::tcp::endpoint& remote_endpoint,
+    const std::filesystem::path& ca_cert_path,
+    const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt,
+    asio::ssl::context::method method = asio::ssl::context::tlsv12_client) {
+  asio::ssl::context ctx(method);
   ctx.load_verify_file(ca_cert_path.string());
   if (ssl_data) {
     ctx.set_verify_mode(asio::ssl::verify_peer);
@@ -183,33 +189,48 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
+    && requires(T x) {
+      {T::Port} -> std::convertible_to<core::Property>;
+      {x.getPort()} -> std::convertible_to<uint16_t>;
+    };  // NOLINT(readability/braces)
+
+template<NetworkingProcessor T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
+  REQUIRE(processor->setProperty(T::Port, "0"));
+  test_plan->scheduleProcessor(processor);
+  REQUIRE(minifi::utils::verifyEventHappenedInPollTime(250ms, [&processor] { return processor->getPort() != 0; }, 20ms));
+  return processor->getPort();
+}
+
 }  // namespace org::apache::nifi::minifi::test::utils


[nifi-minifi-cpp] 02/04: MINIFICPP-1975 Volatile and persistent combination of repositories should be avoided

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d25cd5d89706620076116050b6402f911ecfc4e9
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Feb 8 16:28:08 2023 +0100

    MINIFICPP-1975 Volatile and persistent combination of repositories should be avoided
    
    Closes #1465
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 controller/Controller.h                  | 15 ++++++++++++---
 libminifi/src/core/RepositoryFactory.cpp | 32 ++++++++++++++------------------
 minifi_main/MiNiFiMain.cpp               |  8 ++++++++
 3 files changed, 34 insertions(+), 21 deletions(-)

diff --git a/controller/Controller.h b/controller/Controller.h
index 11b941925..836357977 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -27,6 +27,8 @@
 #include "utils/gsl.h"
 #include "Exception.h"
 #include "FlowController.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/repository/VolatileFlowFileRepository.h"
 
 /**
  * Sends a single argument comment
@@ -258,13 +260,16 @@ std::shared_ptr<org::apache::nifi::minifi::core::controller::ControllerService>
   if (!flow_repo) {
     throw org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION, "Could not create flowfile repository");
   }
-
   flow_repo->initialize(configuration);
 
-
   configuration->get(org::apache::nifi::minifi::Configure::nifi_content_repository_class_name, content_repo_class);
 
   const std::shared_ptr content_repo = org::apache::nifi::minifi::core::createContentRepository(content_repo_class, true, "content");
+  const bool is_flow_repo_non_persistent = flow_repo->isNoop() || std::dynamic_pointer_cast<org::apache::nifi::minifi::core::repository::VolatileFlowFileRepository>(flow_repo) != nullptr;
+  const bool is_content_repo_non_persistent = std::dynamic_pointer_cast<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(content_repo) != nullptr;
+  if (is_flow_repo_non_persistent != is_content_repo_non_persistent) {
+    throw org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION, "Both or neither of flowfile and content repositories must be persistent!");
+  }
 
   content_repo->initialize(configuration);
 
@@ -315,12 +320,16 @@ void printManifest(const std::shared_ptr<org::apache::nifi::minifi::Configure> &
   if (!flow_repo) {
     throw org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION, "Could not create flowfile repository");
   }
-
   flow_repo->initialize(configuration);
 
   configuration->get(org::apache::nifi::minifi::Configure::nifi_content_repository_class_name, content_repo_class);
 
   const std::shared_ptr content_repo = org::apache::nifi::minifi::core::createContentRepository(content_repo_class, true, "content");
+  const bool is_flow_repo_non_persistent = flow_repo->isNoop() || std::dynamic_pointer_cast<org::apache::nifi::minifi::core::repository::VolatileFlowFileRepository>(flow_repo) != nullptr;
+  const bool is_content_repo_non_persistent = std::dynamic_pointer_cast<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(content_repo) != nullptr;
+  if (is_flow_repo_non_persistent != is_content_repo_non_persistent) {
+    throw org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION, "Both or neither of flowfile and content repositories must be persistent!");
+  }
 
   content_repo->initialize(configuration);
 
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index f376dbeb1..fd1cb305e 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -84,25 +84,21 @@ class NoOpThreadedRepository : public core::ThreadedRepository {
 std::unique_ptr<core::Repository> createRepository(const std::string& configuration_class_name, const std::string& repo_name) {
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
-  try {
-    auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
-                                                                                                       class_name_lc);
-    if (return_obj) {
-      return_obj->setName(repo_name);
-      return return_obj;
-    }
-    // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories
-    if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
-      return instantiate<repository::VolatileFlowFileRepository>(repo_name);
-    } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
-      return instantiate<repository::VolatileProvenanceRepository>(repo_name);
-    } else if (class_name_lc == "nooprepository") {
-      return std::make_unique<core::NoOpThreadedRepository>(repo_name);
-    }
-    return {};
-  } catch (const std::runtime_error&) {
-    throw;
+  auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
+                                                                                                     class_name_lc);
+  if (return_obj) {
+    return_obj->setName(repo_name);
+    return return_obj;
+  }
+  // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories
+  if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
+    return instantiate<repository::VolatileFlowFileRepository>(repo_name);
+  } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
+    return instantiate<repository::VolatileProvenanceRepository>(repo_name);
+  } else if (class_name_lc == "nooprepository") {
+    return std::make_unique<core::NoOpThreadedRepository>(repo_name);
   }
+  return {};
 }
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index f0d4fe769..63ee396b2 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -55,6 +55,8 @@
 #include "core/ConfigurationFactory.h"
 #include "core/RepositoryFactory.h"
 #include "core/extension/ExtensionManager.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "core/repository/VolatileFlowFileRepository.h"
 #include "DiskSpaceWatchdog.h"
 #include "properties/Decryptor.h"
 #include "utils/file/PathUtils.h"
@@ -343,6 +345,12 @@ int main(int argc, char **argv) {
       logger->log_error("Content repository failed to initialize, exiting..");
       exit(1);
     }
+    const bool is_flow_repo_non_persistent = flow_repo->isNoop() || std::dynamic_pointer_cast<core::repository::VolatileFlowFileRepository>(flow_repo) != nullptr;
+    const bool is_content_repo_non_persistent = std::dynamic_pointer_cast<core::repository::VolatileContentRepository>(content_repo) != nullptr;
+    if (is_flow_repo_non_persistent != is_content_repo_non_persistent) {
+      logger->log_error("Both or neither of flowfile and content repositories must be persistent! Exiting..");
+      exit(1);
+    }
 
     std::string content_repo_path;
     if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path) && !content_repo_path.empty()) {


[nifi-minifi-cpp] 04/04: MINIFICPP-2043 Reset flowfile repo checkpoint during initialization

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 650f7c28b8935ffd7d1ed49a5ce86a812d7ae8c8
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Feb 8 16:29:47 2023 +0100

    MINIFICPP-2043 Reset flowfile repo checkpoint during initialization
    
    After a C2 update and reload, the flowfile checkpoint pointer was
    pointing to outdated information, causing the flowfile repo to load
    stale data.
    
    Closes #1505
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 extensions/rocksdb-repos/FlowFileRepository.cpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 23695536f..15487359a 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -218,6 +218,7 @@ bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& opendb)
   return it->Valid();
 }
 void FlowFileRepository::initialize_repository() {
+  checkpoint_.reset();
   auto opendb = db_->open();
   if (!opendb) {
     logger_->log_trace("Couldn't open database, no way to checkpoint");


[nifi-minifi-cpp] 03/04: MINIFICPP-2009 CWEL should add resolved attributes with json output as well

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3c44ebaff0ace8454b2245ee3fe23fa484cd4a5a
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Feb 8 16:28:55 2023 +0100

    MINIFICPP-2009 CWEL should add resolved attributes with json output as well
    
    Closes #1482
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 54 ++++++++++------------
 .../windows-event-log/ConsumeWindowsEventLog.h     |  9 +++-
 .../tests/ConsumeWindowsEventLogTests.cpp          | 12 +++++
 3 files changed, 45 insertions(+), 30 deletions(-)

diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 714023acd..fa48bf2ac 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -629,10 +629,10 @@ nonstd::expected<EventRender, std::string> ConsumeWindowsEventLog::createEventRe
   if (output_.xml || output_.json) {
     substituteXMLPercentageItems(doc);
     logger_->log_trace("Finish substituting %% in XML");
+  }
 
-    if (resolve_as_attributes_) {
-      result.matched_fields = walker.getFieldValues();
-    }
+  if (resolve_as_attributes_) {
+    result.matched_fields = walker.getFieldValues();
   }
 
   if (output_.xml) {
@@ -699,42 +699,38 @@ void ConsumeWindowsEventLog::refreshTimeZoneData() {
 }
 
 void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session) const {
-  auto commitFlowFile = [&] (const std::shared_ptr<core::FlowFile>& flowFile, const std::string& content, const std::string& mimeType) {
-    session.writeBuffer(flowFile, content);
-    session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
-    session.putAttribute(flowFile, "timezone.name", timezone_name_);
-    session.putAttribute(flowFile, "timezone.offset", timezone_offset_);
-    session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
-    session.transfer(flowFile, Success);
+  auto commitFlowFile = [&] (const std::string& content, const std::string& mimeType) {
+    auto flow_file = session.create();
+    addMatchedFieldsAsAttributes(eventRender, session, flow_file);
+    session.writeBuffer(flow_file, content);
+    session.putAttribute(flow_file, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
+    session.putAttribute(flow_file, "timezone.name", timezone_name_);
+    session.putAttribute(flow_file, "timezone.offset", timezone_offset_);
+    session.getProvenanceReporter()->receive(flow_file, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
+    session.transfer(flow_file, Success);
   };
 
   if (output_.xml) {
-    auto flowFile = session.create();
     logger_->log_trace("Writing rendered XML to a flow file");
-
-    for (const auto &fieldMapping : eventRender.matched_fields) {
-      if (!fieldMapping.second.empty()) {
-        session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
-      }
-    }
-
-    commitFlowFile(flowFile, eventRender.xml, "application/xml");
+    commitFlowFile(eventRender.xml, "application/xml");
   }
 
   if (output_.plaintext) {
     logger_->log_trace("Writing rendered plain text to a flow file");
-    commitFlowFile(session.create(), eventRender.plaintext, "text/plain");
+    commitFlowFile(eventRender.plaintext, "text/plain");
   }
 
-  if (output_.json.type == JSONType::Raw) {
-    logger_->log_trace("Writing rendered raw JSON to a flow file");
-    commitFlowFile(session.create(), eventRender.json, "application/json");
-  } else if (output_.json.type == JSONType::Simple) {
-    logger_->log_trace("Writing rendered simple JSON to a flow file");
-    commitFlowFile(session.create(), eventRender.json, "application/json");
-  } else if (output_.json.type == JSONType::Flattened) {
-    logger_->log_trace("Writing rendered flattened JSON to a flow file");
-    commitFlowFile(session.create(), eventRender.json, "application/json");
+  if (output_.json) {
+    logger_->log_trace("Writing rendered %s JSON to a flow file", output_.json.type.toString());
+    commitFlowFile(eventRender.json, "application/json");
+  }
+}
+
+void ConsumeWindowsEventLog::addMatchedFieldsAsAttributes(const EventRender& eventRender, core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flowFile) const {
+  for (const auto &fieldMapping : eventRender.matched_fields) {
+    if (!fieldMapping.second.empty()) {
+      session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
+    }
   }
 }
 
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index ef430204c..258846b1e 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -42,6 +42,7 @@
 #include "FlowFileRecord.h"
 #include "concurrentqueue.h"
 #include "pugixml.hpp"
+#include "utils/Enum.h"
 #include "utils/Export.h"
 #include "utils/RegexUtils.h"
 
@@ -145,6 +146,8 @@ class ConsumeWindowsEventLog : public core::Processor {
                                                     const std::shared_ptr<core::ProcessSession>& session,
                                                     const EVT_HANDLE& event_query_results);
 
+  void addMatchedFieldsAsAttributes(const EventRender &eventRender, core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &flowFile) const;
+
   std::shared_ptr<core::logging::Logger> logger_;
   core::StateManager* state_manager_{nullptr};
   wel::METADATA_NAMES header_names_;
@@ -161,7 +164,11 @@ class ConsumeWindowsEventLog : public core::Processor {
   std::map<std::string, wel::WindowsEventLogHandler> providers_;
   uint64_t batch_commit_size_{};
 
-  enum class JSONType { None, Raw, Simple, Flattened };
+  SMART_ENUM(JSONType,
+      (None, "None"),
+      (Raw, "Raw"),
+      (Simple, "Simple"),
+      (Flattened, "Flattened"))
 
   struct OutputFormat {
     bool xml{false};
diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
index c16767ee5..c5f9d052d 100644
--- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
+++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
@@ -238,6 +238,18 @@ TEST_CASE("ConsumeWindowsEventLog extracts some attributes by default", "[onTrig
   auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
   test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
 
+  SECTION("XML output") {
+    REQUIRE(test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), "XML"));
+  }
+
+  SECTION("Json output") {
+    REQUIRE(test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), "JSON"));
+  }
+
+  SECTION("Plaintext output") {
+    REQUIRE(test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), "Plaintext"));
+  }
+
   // 0th event, only to create a bookmark
   {
     reportEvent(APPLICATION_CHANNEL, "Event zero: this is in the past");