You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/08 19:33:56 UTC

[nifi-minifi-cpp] 01/04: MINIFICPP-1979 Use Coroutines with asio

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f2f561ce2b4635e721b1d5b08b34bf8d450c9f6f
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Feb 8 16:26:01 2023 +0100

    MINIFICPP-1979 Use Coroutines with asio
    
    MINIFICPP-1985 Run asio related tests on random ports
    
    Closes #1457
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .github/workflows/ci.yml                           |   6 +-
 CMakeLists.txt                                     |   3 +
 cmake/BuildTests.cmake                             |   1 +
 .../CMakeLists.txt => cmake/Coroutines.cmake       |  21 +-
 extensions/standard-processors/CMakeLists.txt      |   3 +
 .../processors/ListenSyslog.cpp                    |   5 +-
 .../standard-processors/processors/ListenTCP.cpp   |   5 +-
 .../processors/NetworkListenerProcessor.cpp        |   8 +-
 .../processors/NetworkListenerProcessor.h          |   7 +-
 .../standard-processors/processors/PutTCP.cpp      | 484 +++++++--------------
 extensions/standard-processors/processors/PutTCP.h |  32 +-
 .../standard-processors/processors/PutUDP.cpp      |   2 +-
 .../standard-processors/tests/CMakeLists.txt       |   2 +
 .../tests/unit/ListenSyslogTests.cpp               | 204 +++++----
 .../tests/unit/ListenTcpTests.cpp                  | 187 +++++---
 .../tests/unit/ListenUDPTests.cpp                  |  53 +--
 .../standard-processors/tests/unit/PutTCPTests.cpp | 162 +++----
 .../standard-processors/tests/unit/PutUDPTests.cpp |  13 +-
 libminifi/include/controllers/SSLContextService.h  |  13 +-
 libminifi/include/utils/net/AsioCoro.h             |  75 ++++
 libminifi/include/utils/net/Server.h               |  15 +-
 .../include/utils/net/SessionHandlingServer.h      |  67 ---
 libminifi/include/utils/net/Ssl.h                  |  16 +
 libminifi/include/utils/net/SslServer.h            |  65 ---
 libminifi/include/utils/net/TcpServer.h            |  38 +-
 libminifi/include/utils/net/UdpServer.h            |  12 +-
 libminifi/src/controllers/SSLContextService.cpp    |  10 +-
 libminifi/src/utils/net/SslServer.cpp              |  90 ----
 libminifi/src/utils/net/TcpServer.cpp              |  90 ++--
 libminifi/src/utils/net/UdpServer.cpp              |  41 +-
 libminifi/test/Catch.h                             |  46 +-
 libminifi/test/Utils.h                             |  75 ++--
 32 files changed, 861 insertions(+), 990 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c3017324e..9aa080554 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,7 +3,7 @@ on: [push, pull_request, workflow_dispatch]
 jobs:
   macos_xcode:
     name: "macos-xcode"
-    runs-on: macos-11
+    runs-on: macos-12
     timeout-minutes: 180
     env:
       CCACHE_BASEDIR: ${{ GITHUB.WORKSPACE }}
@@ -29,8 +29,8 @@ jobs:
         run: |
           echo "PATH=/usr/lib/ccache:/usr/local/opt/ccache/bin:/usr/local/opt/ccache/libexec:$PATH" >> $GITHUB_ENV
           echo -e "127.0.0.1\t$HOSTNAME" | sudo tee -a /etc/hosts > /dev/null
-          # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-11-Readme.md#xcode
-          sudo xcode-select -switch /Applications/Xcode_13.2.1.app
+          # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-12-Readme.md#xcode
+          sudo xcode-select -switch /Applications/Xcode_14.0.1.app
       - name: build
         run: |
           export PATH="/usr/local/opt/flex/bin:$PATH"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 35cf83db7..513afe7ae 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -286,6 +286,9 @@ target_include_directories(RapidJSON SYSTEM INTERFACE "${CMAKE_CURRENT_SOURCE_DI
 # cxxopts
 include(CxxOpts)
 
+include(Coroutines)
+enable_coroutines()
+
 # gsl-lite
 include(GslLite)
 
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index bf21b589f..238b1f87b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -91,6 +91,7 @@ endif()
 SET(CATCH_MAIN_LIB catch_main)
 add_library(${CATCH_MAIN_LIB} STATIC "${TEST_DIR}/CatchMain.cpp")
 target_include_directories(${CATCH_MAIN_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+target_link_libraries(${CATCH_MAIN_LIB} spdlog)  # for fmt
 
 SET(TEST_RESOURCES ${TEST_DIR}/resources)
 
diff --git a/extensions/standard-processors/CMakeLists.txt b/cmake/Coroutines.cmake
similarity index 59%
copy from extensions/standard-processors/CMakeLists.txt
copy to cmake/Coroutines.cmake
index db679c286..e027551e5 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/cmake/Coroutines.cmake
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,19 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-
-file(GLOB SOURCES  "processors/*.cpp" "controllers/*.cpp" )
-
-add_library(minifi-standard-processors SHARED ${SOURCES})
-
-include(RangeV3)
-include(Asio)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
-
-register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
 
-register_extension_linter(minifi-standard-processors-linter)
+function(enable_coroutines)
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11)
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines" PARENT_SCOPE)
+    endif()
+endfunction(enable_coroutines)
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index db679c286..d9b46400a 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -28,6 +28,9 @@ include(RangeV3)
 include(Asio)
 target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
 
+include(Coroutines)
+enable_coroutines()
+
 register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
 
 register_extension_linter(minifi-standard-processors-linter)
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 41518a1d1..ecd6b7477 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -22,6 +22,7 @@
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "controllers/SSLContextService.h"
+#include "utils/net/Ssl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -67,8 +68,8 @@ const core::Property ListenSyslog::SSLContextService(
 const core::Property ListenSyslog::ClientAuth(
     core::PropertyBuilder::createProperty("Client Auth")
       ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+      ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+      ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
       ->build());
 
 const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. "
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
index 94c2d9884..2b600677e 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -20,6 +20,7 @@
 #include "core/PropertyBuilder.h"
 #include "controllers/SSLContextService.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "utils/net/Ssl.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -54,8 +55,8 @@ const core::Property ListenTCP::SSLContextService(
 const core::Property ListenTCP::ClientAuth(
     core::PropertyBuilder::createProperty("Client Auth")
       ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+      ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+      ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
       ->build());
 
 const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index 21b74f1ef..ebe884c1c 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex
   auto options = readServerOptions(context);
 
   std::string ssl_value;
+  std::optional<utils::net::SslServerOptions> ssl_options;
   if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
     auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
-    server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
-  } else {
-    server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
+    auto client_auth = utils::parseEnumProperty<utils::net::ClientAuthOption>(context, client_auth_property);
+    ssl_options.emplace(std::move(*ssl_data), client_auth);
   }
+  server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_, ssl_options);
 
   startServer(options, utils::net::IpProtocol::TCP);
 }
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index 1799a3fcb..447a59322 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -26,7 +26,6 @@
 #include "core/ProcessSession.h"
 #include "core/Property.h"
 #include "utils/net/Server.h"
-#include "utils/net/SslServer.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -51,6 +50,12 @@ class NetworkListenerProcessor : public core::Processor {
     stopServer();
   }
 
+  uint16_t getPort() {
+    if (server_)
+      return server_->getPort();
+    return 0;
+  }
+
  protected:
   void startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property);
   void startUdpServer(const core::ProcessContext& context);
diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp
index 0b992ebf6..c5c2d7717 100644
--- a/extensions/standard-processors/processors/PutTCP.cpp
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -16,30 +16,29 @@
  */
 #include "PutTCP.h"
 
-#include <algorithm>
 #include <utility>
+#include <tuple>
 
 #include "range/v3/range/conversion.hpp"
 
 #include "utils/gsl.h"
-#include "utils/expected.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "core/logging/Logger.h"
-#include "controllers/SSLContextService.h"
 
-#include "asio/ssl.hpp"
-#include "asio/ip/tcp.hpp"
-#include "asio/write.hpp"
-#include "asio/high_resolution_timer.hpp"
+#include "utils/net/AsioCoro.h"
 
 using asio::ip::tcp;
-using TcpSocket = asio::ip::tcp::socket;
-using SslSocket = asio::ssl::stream<tcp::socket>;
 
 using namespace std::literals::chrono_literals;
+using std::chrono::steady_clock;
+using org::apache::nifi::minifi::utils::net::use_nothrow_awaitable;
+using org::apache::nifi::minifi::utils::net::HandshakeType;
+using org::apache::nifi::minifi::utils::net::TcpSocket;
+using org::apache::nifi::minifi::utils::net::SslSocket;
+using org::apache::nifi::minifi::utils::net::asyncOperationWithTimeout;
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -114,6 +113,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
+  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
+  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+  ssl_context.load_verify_file(ssl_context_service.getCACertificate().string());
+  ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  if (const auto& cert_file = ssl_context_service.getCertificateFile(); !cert_file.empty())
+    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
+  if (const auto& private_key_file = ssl_context_service.getPrivateKeyFile(); !private_key_file.empty())
+    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
+  ssl_context.set_password_callback([password = ssl_context_service.getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
+  return ssl_context;
+}
+}  // namespace
+
 void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
   gsl_Expects(context);
 
@@ -130,29 +144,31 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
-        logger_->log_error("%s is not a SSL Context Service", context_name);
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service");
+      }
     } else {
-      logger_->log_error("Invalid controller service: %s", context_name);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name);
     }
   }
 
   delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
 
-  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
-    connections_.reset();
-  else
-    connections_.emplace();
-
   if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
     max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
   else
@@ -160,6 +176,16 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
@@ -167,332 +193,131 @@ class ConnectionHandler : public ConnectionHandlerBase {
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    asio::ssl::context* ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+      const std::vector<std::byte>& delimiter,
+      asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const { return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  asio::ssl::context* ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : endpoints) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate().string());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+    const std::vector<std::byte>& delimiter,
+    asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);
+    if (io::isError(num_read))
+      co_return std::make_error_code(std::errc::io_error);
+    auto [write_error, bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(data_chunk, num_read), use_nothrow_awaitable), timeout_duration_);
+    if (write_error)
+      co_return write_error;
+    logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
+  }
+  auto [delimiter_write_error, delimiter_bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(delimiter), use_nothrow_awaitable), timeout_duration_);
+  if (delimiter_write_error)
+    co_return delimiter_write_error;
+  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", delimiter_bytes_written);
 
-template<class SocketType>
-nonstd::expected<tcp::resolver::results_type, std::error_code> ConnectionHandler<SocketType>::resolveHostname() {
-  tcp::resolver resolver(io_context_);
-  std::error_code error_code;
-  auto resolved_query = resolver.resolve(connection_id_.getHostname(), connection_id_.getPort(), error_code);
-  if (error_code)
-    return nonstd::make_unexpected(error_code);
-  return resolved_query;
+  last_used_ = steady_clock::now();
+  co_return std::error_code();
 }
 }  // namespace
 
@@ -517,19 +342,13 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return;
   }
 
-  auto flow_file_content_stream = session->getFlowFileContentStream(flow_file);
-  if (!flow_file_content_stream) {
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
   auto connection_id = detail::ConnectionId(std::move(hostname), std::move(port));
   std::shared_ptr<ConnectionHandlerBase> handler;
   if (!connections_ || !connections_->contains(connection_id)) {
-    if (ssl_context_service_)
-      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, ssl_context_service_);
+    if (ssl_context_)
+      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
     else
-      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, nullptr);
+      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
     if (connections_)
       (*connections_)[connection_id] = handler;
   } else {
@@ -538,7 +357,7 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
 
   gsl_Expects(handler);
 
-  processFlowFile(handler, flow_file_content_stream, *session, flow_file);
+  processFlowFile(handler, *session, flow_file);
 }
 
 void PutTCP::removeExpiredConnections() {
@@ -550,30 +369,43 @@ void PutTCP::removeExpiredConnections() {
   }
 }
 
+std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+    const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
+  std::error_code operation_error;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+      connection_handler->sendStreamWithDelimiter(flow_file_content_stream, delimiter_, io_context_),
+      [&operation_error](const std::exception_ptr&, std::error_code error_code) {
+        operation_error = error_code;
+      });
+  io_context_.run();
+  return operation_error;
+}
+
 void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
-                             const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                             core::ProcessSession& session,
-                             const std::shared_ptr<core::FlowFile>& flow_file) {
-  auto result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  auto flow_file_content_stream = session.getFlowFileContentStream(flow_file);
+  if (!flow_file_content_stream) {
+    session.transfer(flow_file, Failure);
+    return;
+  }
 
-  if (!result && connection_handler->hasBeenUsed()) {
-    logger_->log_warn("%s with reused connection, retrying...", result.error().message());
+  std::error_code operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
+
+  if (operation_error && connection_handler->hasBeenUsed()) {
+    logger_->log_warn("%s with reused connection, retrying...", operation_error.message());
     connection_handler->reset();
-    result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+    operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
   }
 
-  const auto transfer_to_success = [&session, &flow_file]() -> void {
-    session.transfer(flow_file, Success);
-  };
-
-  const auto transfer_to_failure = [&session, &flow_file, &logger = logger_, &connection_handler](std::error_code ec) -> void {
-    gsl_Expects(ec);
+  if (operation_error) {
     connection_handler->reset();
-    logger->log_error("%s", ec.message());
+    logger_->log_error("%s", operation_error.message());
     session.transfer(flow_file, Failure);
-  };
-
-  result | utils::map(transfer_to_success) | utils::orElse(transfer_to_failure);
+  } else {
+    session.transfer(flow_file, Success);
+  }
 }
 
 REGISTER_RESOURCE(PutTCP, Processor);
diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h
index 1f6f7fb58..58ae28f94 100644
--- a/extensions/standard-processors/processors/PutTCP.h
+++ b/extensions/standard-processors/processors/PutTCP.h
@@ -32,6 +32,10 @@
 #include "utils/expected.h"
 #include "utils/StringUtils.h"  // for string <=> on libc++
 
+#include <asio/io_context.hpp>
+#include <asio/awaitable.hpp>
+#include <asio/ssl/context.hpp>
+
 namespace org::apache::nifi::minifi::processors::detail {
 
 class ConnectionId {
@@ -50,12 +54,12 @@ class ConnectionId {
 }  // namespace org::apache::nifi::minifi::processors::detail
 
 namespace std {
-template <>
+template<>
 struct hash<org::apache::nifi::minifi::processors::detail::ConnectionId> {
   size_t operator()(const org::apache::nifi::minifi::processors::detail::ConnectionId& connection_id) const {
     return org::apache::nifi::minifi::utils::hash_combine(
         std::hash<std::string_view>{}(connection_id.getHostname()),
-        std::hash <std::string_view>{}(connection_id.getPort()));
+        std::hash<std::string_view>{}(connection_id.getPort()));
   }
 };
 }  // namespace std
@@ -64,16 +68,19 @@ namespace org::apache::nifi::minifi::processors {
 class ConnectionHandlerBase {
  public:
   virtual ~ConnectionHandlerBase() = default;
+  virtual void reset() = 0;
 
   [[nodiscard]] virtual bool hasBeenUsed() const = 0;
   [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) const = 0;
-  virtual nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) = 0;
-  virtual void reset() = 0;
+  [[nodiscard]] virtual asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+      const std::vector<std::byte>& delimiter,
+      asio::io_context& io_context) = 0;
 };
 
 class PutTCP final : public core::Processor {
  public:
-  EXTENSIONAPI static constexpr const char* Description = "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+  EXTENSIONAPI static constexpr const char* Description =
+      "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
       "By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, "
       "an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. "
       "An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection "
@@ -107,22 +114,25 @@ class PutTCP final : public core::Processor {
 
   void initialize() final;
   void notifyStop() final;
-  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) final;
   void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
 
  private:
   void removeExpiredConnections();
   void processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
-                       const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                       core::ProcessSession& session,
-                       const std::shared_ptr<core::FlowFile>& flow_file);
+      core::ProcessSession& session,
+      const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::error_code sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+      const std::shared_ptr<io::InputStream>& flow_file_content_stream);
 
   std::vector<std::byte> delimiter_;
+  asio::io_context io_context_;
   std::optional<std::unordered_map<detail::ConnectionId, std::shared_ptr<ConnectionHandlerBase>>> connections_;
   std::optional<std::chrono::milliseconds> idle_connection_expiration_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
-  std::chrono::milliseconds timeout_ = std::chrono::seconds(15);
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::chrono::milliseconds timeout_duration_ = std::chrono::seconds(15);
+  std::optional<asio::ssl::context> ssl_context_;
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutTCP>::getLogger(uuid_);
 };
 
diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index c59a86e80..90910dcd1 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -93,7 +93,7 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
   }
 
   const auto data = session->readBuffer(flow_file);
-  if (data.status < 0) {
+  if (io::isError(data.status)) {
     session->transfer(flow_file, Failure);
     return;
   }
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index 784de1824..353b7bbca 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -17,6 +17,8 @@
 # under the License.
 #
 
+include(Coroutines)
+enable_coroutines()
 
 file(GLOB PROCESSOR_UNIT_TESTS  "unit/*.cpp")
 file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp")
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 9a4aa65a5..c63a67b22 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -29,7 +29,6 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t SYSLOG_PORT = 10255;
 constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
 
 struct ValidRFC5424Message {
@@ -249,56 +248,61 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   std::string protocol;
+  uint16_t port = 0;
 
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    protocol = "UDP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
     protocol = "UDP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
-    utils::sendUdpDatagram(invalid_syslog, endpoint);
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    protocol = "TCP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "TCP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
   }
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
 
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, protocol);
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, protocol);
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, protocol);
 }
 
 TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
@@ -306,75 +310,80 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
 
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
 
   std::string protocol;
+  uint16_t port = 0;
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    protocol = "UDP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "UDP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    std::this_thread::sleep_for(100ms);
-    utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint);
-
-    utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint);
-    utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint);
-
-    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
-    utils::sendUdpDatagram(invalid_syslog, endpoint);
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    protocol = "TCP";
+
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    protocol = "TCP";
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
-    std::this_thread::sleep_for(100ms);
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
-                                       rfc5424_doc_example_2.unparsed_,
-                                       rfc5424_doc_example_3.unparsed_,
-                                       rfc5424_doc_example_4.unparsed_}, endpoint));
-
-    REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
-                                       rfc3164_doc_example_2.unparsed_,
-                                       rfc3164_doc_example_3.unparsed_,
-                                       rfc3164_doc_example_4.unparsed_}, endpoint));
-
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
+                                          rfc5424_doc_example_2.unparsed_,
+                                          rfc5424_doc_example_3.unparsed_,
+                                          rfc5424_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
+                                          rfc3164_doc_example_2.unparsed_,
+                                          rfc3164_doc_example_3.unparsed_,
+                                          rfc3164_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
   }
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
-  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
+  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9},
+                                   {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
   REQUIRE(result.at(ListenSyslog::Success).size() == 9);
   REQUIRE(result.at(ListenSyslog::Invalid).size() == 1);
 
   std::unordered_map<std::string, core::FlowFile&> success_flow_files;
 
-  for (auto& flow_file : result.at(ListenSyslog::Success)) {
+  for (auto& flow_file: result.at(ListenSyslog::Success)) {
     success_flow_files.insert({controller.plan->getContent(flow_file), *flow_file});
   }
 
@@ -388,26 +397,25 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
   REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_3.unparsed_)));
   REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_4.unparsed_)));
 
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, port, protocol);
 
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, SYSLOG_PORT, protocol);
-  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, port, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, port, protocol);
 
   REQUIRE(success_flow_files.contains(std::string(rfc5424_logger_example_1)));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Invalid)[0]) == invalid_syslog);
 }
 
-
 TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, "0"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
   SECTION("UDP") {
@@ -429,45 +437,48 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "10"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenSyslog>();
 
+  uint16_t port = 0;
+
   SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::udp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
-    controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
+      CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
   }
 
   SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
     asio::ip::tcp::endpoint endpoint;
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
-    controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint));
+      CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint), MatchesSuccess());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
   }
@@ -480,41 +491,44 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
 }
 
 TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
-  asio::ip::tcp::endpoint endpoint;
-  SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
-  }
-  SECTION("sending through IPv6", "[IPv6]") {
-    if (utils::isIPv6Disabled())
-      return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
-  }
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
-
   SingleProcessorTestController controller{listen_syslog};
+
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  ssl_context_service->enable();
+
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
-  ssl_context_service->enable();
-  controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
+  }
+
+  CHECK_THAT(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
 
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, "TCP");
-  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, "TCP");
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, "TCP");
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, "TCP");
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 158e1a072..32b0c6558 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -29,48 +29,45 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10254;
-
-void check_for_attributes(core::FlowFile& flow_file) {
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("tcp.port"));
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
 }
 
 TEST_CASE("ListenTCP test multiple messages", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
-  SingleProcessorTestController controller{listen_tcp};
-  LogTestController::getInstance().setTrace<ListenTCP>();
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
 
-  controller.plan->scheduleProcessor(listen_tcp);
-  REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint));
-  REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint));
+  CHECK_THAT(utils::sendMessagesViaTCP({"test_message_1"}, endpoint), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaTCP({"another_message"}, endpoint), MatchesSuccess());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[1]) == "another_message");
 
-  check_for_attributes(*result.at(ListenTCP::Success)[0]);
-  check_for_attributes(*result.at(ListenTCP::Success)[1]);
+  check_for_attributes(*result.at(ListenTCP::Success)[0], port);
+  check_for_attributes(*result.at(ListenTCP::Success)[1], port);
 }
 
 TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
   SingleProcessorTestController controller{listen_tcp};
   LogTestController::getInstance().setTrace<ListenTCP>();
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, "0"));
   REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "100"));
 
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
@@ -79,27 +76,27 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]
 }
 
 TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
-  SingleProcessorTestController controller{listen_tcp};
-  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
-  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenTCP>();
 
-  controller.plan->scheduleProcessor(listen_tcp);
   for (auto i = 0; i < 100; ++i) {
-    REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint));
+    CHECK_THAT(utils::sendMessagesViaTCP({"test_message"}, endpoint), MatchesSuccess());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
@@ -113,7 +110,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkLis
 
 TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
+  uint16_t port = 0;
   SingleProcessorTestController controller{listen_tcp};
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   LogTestController::getInstance().setTrace<ListenTCP>();
@@ -122,7 +119,6 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
-  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), std::to_string(PORT)));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
   std::vector<std::string> expected_successful_messages;
@@ -131,60 +127,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
-    for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    for (const auto& message: expected_successful_messages) {
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
     }
   }
 
   SECTION("With client certificate provided") {
     SECTION("Client certificate required") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required but validated") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     minifi::utils::net::SslData ssl_data;
     ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
@@ -194,31 +194,114 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data));
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data), MatchesSuccess());
     }
   }
 
   SECTION("Required certificate not provided") {
+    ssl_context_service->enable();
+    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    CHECK_THAT(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesError());
   }
 
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, expected_successful_messages.size()}}, result, 300ms, 50ms));
   for (std::size_t i = 0; i < expected_successful_messages.size(); ++i) {
     CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[i]) == expected_successful_messages[i]);
-    check_for_attributes(*result.at(ListenTCP::Success)[i]);
+    check_for_attributes(*result.at(ListenTCP::Success)[i], port);
+  }
+}
+
+namespace {
+bool isSslMethodAvailable(asio::ssl::context::method method) {
+  try {
+    [[maybe_unused]] asio::ssl::context ctx(method);
+    return true;
+  } catch (const asio::system_error& err) {
+    if (err.code() == asio::error::invalid_argument) {
+      return false;
+    } else {
+      throw;
+    }
+  }
+}
+}  // namespace
+
+TEST_CASE("Test ListenTCP SSL/TLS compatibility", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
+  REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+
+  ssl_context_service->enable();
+  uint16_t port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+  asio::ip::tcp::endpoint endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+
+  minifi::utils::net::SslData ssl_data;
+  ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
+  ssl_data.cert_loc = executable_dir / "resources" / "localhost_by_A.pem";
+  ssl_data.key_loc = executable_dir / "resources" / "localhost_by_A.pem";
+  ssl_data.key_pw = "Password12";
+
+
+  asio::ssl::context::method client_method;
+  bool expected_to_work;
+
+  SECTION("sslv2 should be disabled") {
+    client_method = asio::ssl::context::method::sslv2_client;
+    expected_to_work = false;
+  }
+
+  SECTION("sslv3 should be disabled") {
+    client_method = asio::ssl::context::method::sslv3_client;
+    expected_to_work = false;
+  }
+
+  SECTION("tlsv11 should be disabled") {
+    client_method = asio::ssl::context::method::tlsv11_client;
+    expected_to_work = false;
+  }
+
+  SECTION("tlsv12 should be enabled") {
+    client_method = asio::ssl::context::method::tlsv12_client;
+    expected_to_work = true;
+  }
+
+  SECTION("tlsv13 should be enabled") {
+    client_method = asio::ssl::context::method::tlsv13_client;
+    expected_to_work = true;
+  }
+
+  if (!isSslMethodAvailable(client_method))
+    return;
+
+  auto send_result = utils::sendMessagesViaSSL({"message"}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data, client_method);
+  if (expected_to_work) {
+    CHECK_THAT(send_result, MatchesSuccess());
+    ProcessorTriggerResult result;
+    CHECK(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
+  } else {
+    CHECK_THAT(send_result, MatchesError());
+    ProcessorTriggerResult result;
+    CHECK_FALSE(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
   }
 }
 
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
index ecce9120d..ca1c0ec25 100644
--- a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -29,49 +29,49 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10256;
-
-void check_for_attributes(core::FlowFile& flow_file) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
 }
 
 TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
   asio::ip::udp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
-  SingleProcessorTestController controller{listen_udp};
-  LogTestController::getInstance().setTrace<ListenUDP>();
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
 
   controller.plan->scheduleProcessor(listen_udp);
-  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
-  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  CHECK_THAT(utils::sendUdpDatagram({"test_message_1"}, endpoint), MatchesSuccess());
+  CHECK_THAT(utils::sendUdpDatagram({"another_message"}, endpoint), MatchesSuccess());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));
   CHECK(result.at(ListenUDP::Success).size() == 2);
   CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1");
   CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message");
 
-  check_for_attributes(*result.at(ListenUDP::Success)[0]);
-  check_for_attributes(*result.at(ListenUDP::Success)[1]);
+  check_for_attributes(*result.at(ListenUDP::Success)[0], port);
+  check_for_attributes(*result.at(ListenUDP::Success)[1], port);
 }
 
 TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") {
   const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
   SingleProcessorTestController controller{listen_udp};
   LogTestController::getInstance().setTrace<ListenUDP>();
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, "0"));
   REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "100"));
 
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
@@ -80,27 +80,28 @@ TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]
 }
 
 TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
+
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
   asio::ip::udp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
   }
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
       return;
-    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
   }
-  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
-  SingleProcessorTestController controller{listen_udp};
-  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
-  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
 
   LogTestController::getInstance().setWarn<ListenUDP>();
 
   controller.plan->scheduleProcessor(listen_udp);
   for (auto i = 0; i < 100; ++i) {
-    REQUIRE(utils::sendUdpDatagram({"test_message"}, endpoint));
+    CHECK_THAT(utils::sendUdpDatagram({"test_message"}, endpoint), MatchesSuccess());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 3396a50b4..ac44c4db8 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -27,9 +27,10 @@
 #include "controllers/SSLContextService.h"
 #include "core/ProcessSession.h"
 #include "utils/net/TcpServer.h"
-#include "utils/net/SslServer.h"
+#include "utils/net/AsioCoro.h"
 #include "utils/expected.h"
 #include "utils/StringUtils.h"
+#include "IntegrationTestUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -38,69 +39,48 @@ namespace org::apache::nifi::minifi::processors {
 using controllers::SSLContextService;
 
 namespace {
-using utils::net::TcpSession;
-using utils::net::TcpServer;
 
-using utils::net::SslSession;
-using utils::net::SslServer;
-
-class ISessionAwareServer {
+class CancellableTcpServer : public utils::net::TcpServer {
  public:
-  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
-  virtual void closeSessions() = 0;
-};
+  using utils::net::TcpServer::TcpServer;
 
-template<class SessionType>
-class SessionAwareServer : public ISessionAwareServer {
- protected:
-  size_t getNumberOfSessions() const override {
-    std::lock_guard lock_guard{mutex_};
-    return sessions_.size();
-  }
-
-  void closeSessions() override {
-    std::lock_guard lock_guard{mutex_};
-    for (const auto& session_weak : sessions_) {
-      if (auto session = session_weak.lock()) {
-        auto& socket = session->getSocket();
-        if (socket.is_open()) {
-          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
-          session->getSocket().close();
-        }
-      }
-    }
+  size_t getNumberOfSessions() const {
+    return cancellable_timers_.size();
   }
 
-  mutable std::mutex mutex_;
-  std::vector<std::weak_ptr<SessionType>> sessions_;
-};
+  void cancelEverything() {
+    for (auto& timer : cancellable_timers_)
+      io_context_.post([=]{timer->cancel();});
+  }
 
-class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
- public:
-  using TcpServer::TcpServer;
+  asio::awaitable<void> doReceive() override {
+    using asio::experimental::awaitable_operators::operator||;
 
- protected:
-  std::shared_ptr<TcpSession> createSession() override {
-    std::lock_guard lock_guard{mutex_};
-    auto session = TcpServer::createSession();
-    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
-    sessions_.emplace_back(session);
-    return session;
+    asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+    if (port_ == 0)
+      port_ = acceptor.local_endpoint().port();
+    while (true) {
+      auto [accept_error, socket] = co_await acceptor.async_accept(utils::net::use_nothrow_awaitable);
+      if (accept_error) {
+        logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+        break;
+      }
+      auto cancellable_timer = std::make_shared<asio::steady_timer>(io_context_);
+      cancellable_timers_.push_back(cancellable_timer);
+      if (ssl_data_)
+        co_spawn(io_context_, secureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+      else
+        co_spawn(io_context_, insecureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+    }
   }
-};
-
-class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
- public:
-  using SslServer::SslServer;
 
- protected:
-  std::shared_ptr<SslSession> createSession() override {
-    std::lock_guard lock_guard{mutex_};
-    auto session = SslServer::createSession();
-    logger_->log_trace("SessionAwareSslServer::createSession %p", session.get());
-    sessions_.emplace_back(session);
-    return session;
+ private:
+  static asio::awaitable<void> wait_until_cancelled(std::shared_ptr<asio::steady_timer> timer) {
+    timer->expires_at(asio::steady_timer::time_point::max());
+    co_await utils::net::async_wait(*timer);
   }
+
+  std::vector<std::shared_ptr<asio::steady_timer>> cancellable_timers_;
 };
 
 utils::net::SslData createSslDataForServer() {
@@ -129,28 +109,28 @@ class PutTCPTestFixture {
   }
 
   void stopServers() {
-    for (auto& [port, server] : listeners_) {
-      auto& listener = server.listener_;
+    for (auto& [port, server] : servers_) {
+      auto& cancellable_server = server.cancellable_server;
       auto& server_thread = server.server_thread_;
-      if (listener)
-        listener->stop();
+      if (cancellable_server)
+        cancellable_server->stop();
       if (server_thread.joinable())
         server_thread.join();
-      listener.reset();
+      cancellable_server.reset();
     }
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto cancellable_tcp_server = getServer(port)) {
+      return cancellable_tcp_server->getNumberOfSessions();
     }
     return -1;
   }
 
   void closeActiveConnections() {
-    for (auto& [port, server] : listeners_) {
-      if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
-        session_aware_listener->closeSessions();
+    for (auto& [port, server] : servers_) {
+      if (auto cancellable_tcp_server = getServer(port)) {
+        cancellable_tcp_server->cancelEverything();
       }
     }
     std::this_thread::sleep_for(200ms);
@@ -171,7 +151,7 @@ class PutTCPTestFixture {
     auto start_time = std::chrono::system_clock::now();
     utils::net::Message result;
     while (start_time + timeout > std::chrono::system_clock::now()) {
-      if (getListener(port)->tryDequeue(result))
+      if (getServer(port)->tryDequeue(result))
         return result;
       std::this_thread::sleep_for(interval);
     }
@@ -204,14 +184,17 @@ class PutTCPTestFixture {
   }
 
   uint16_t addTCPServer() {
-    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
-    listeners_[port].startTCPServer(port);
+    Server server;
+    uint16_t port = server.startTCPServer(std::nullopt);
+    servers_[port] = std::move(server);
     return port;
   }
 
   uint16_t addSSLServer() {
-    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
-    listeners_[port].startSSLServer(port);
+    auto ssl_server_options = utils::net::SslServerOptions{createSslDataForServer(), utils::net::ClientAuthOption::REQUIRED};
+    Server server;
+    uint16_t port = server.startTCPServer(ssl_server_options);
+    servers_[port] = std::move(server);
     return port;
   }
 
@@ -224,47 +207,36 @@ class PutTCPTestFixture {
   }
 
   [[nodiscard]] uint16_t getSinglePort() const {
-    gsl_Expects(listeners_.size() == 1);
-    return listeners_.begin()->first;
+    gsl_Expects(servers_.size() == 1);
+    return servers_.begin()->first;
   }
 
  private:
-  utils::net::Server* getListener(std::optional<uint16_t> port) {
+  CancellableTcpServer* getServer(std::optional<uint16_t> port) {
     if (!port)
       port = getSinglePort();
-    return listeners_.at(*port).listener_.get();
+    return servers_.at(*port).cancellable_server.get();
   }
 
   const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
   test::SingleProcessorTestController controller_{put_tcp_};
 
-  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-
   class Server {
    public:
     Server() = default;
 
-    void startTCPServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
-      server_thread_ = std::thread([this]() { listener_->run(); });
-    }
-
-    void startSSLServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
-                                                          port,
-                                                          core::logging::LoggerFactory<utils::net::Server>::getLogger(),
-                                                          createSslDataForServer(),
-                                                          utils::net::SslServer::ClientAuthOption::REQUIRED);
-      server_thread_ = std::thread([this]() { listener_->run(); });
+    uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> ssl_server_options) {
+      gsl_Expects(!cancellable_server && !server_thread_.joinable());
+      cancellable_server = std::make_unique<CancellableTcpServer>(std::nullopt, 0, core::logging::LoggerFactory<utils::net::Server>::getLogger(), std::move(ssl_server_options));
+      server_thread_ = std::thread([this]() { cancellable_server->run(); });
+      REQUIRE(utils::verifyEventHappenedInPollTime(250ms, [this] { return cancellable_server->getPort() != 0; }, 20ms));
+      return cancellable_server->getPort();
     }
 
-    std::unique_ptr<utils::net::Server> listener_;
+    std::unique_ptr<CancellableTcpServer> cancellable_server;
     std::thread server_thread_;
   };
-  std::unordered_map<uint16_t, Server> listeners_;
+  std::unordered_map<uint16_t, Server> servers_;
 };
 
 void trigger_expect_success(PutTCPTestFixture& test_fixture, const std::string_view message, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
@@ -382,7 +354,8 @@ TEST_CASE("PutTCP test invalid host", "[PutTCP]") {
   trigger_expect_failure(test_fixture, "message for invalid host");
 
   CHECK((LogTestController::getInstance().contains("Host not found", 0ms)
-      || LogTestController::getInstance().contains("No such host is known", 0ms)));
+      || LogTestController::getInstance().contains("No such host is known", 0ms)
+      || LogTestController::getInstance().contains("A connection attempt failed because the connected party did not properly respond", 0ms)));
 }
 
 TEST_CASE("PutTCP test invalid server", "[PutTCP]") {
@@ -412,7 +385,8 @@ TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
   test_fixture.setPutTCPPort(1235);
   trigger_expect_failure(test_fixture, "message for non-routable server");
 
-  CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
+  CHECK((LogTestController::getInstance().contains("No route to host", 0ms)
+      || LogTestController::getInstance().contains("Connection timed out", 0ms)
       || LogTestController::getInstance().contains("Operation timed out", 0ms)
       || LogTestController::getInstance().contains("host has failed to respond", 0ms)
       || LogTestController::getInstance().contains("No route to host", 0ms)));
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index ca63f5da5..b22ad6746 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
   {
     const char* const message = "first message: hello";
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index a9c7a8c55..300db0266 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -141,13 +141,10 @@ class SSLContextService : public core::controller::ControllerService {
 
   std::unique_ptr<SSLContext> createSSLContext();
 
-  const std::filesystem::path& getCertificateFile();
-
-  const std::string& getPassphrase();
-
-  const std::filesystem::path& getPrivateKeyFile();
-
-  const std::filesystem::path& getCACertificate();
+  const std::filesystem::path& getCertificateFile() const;
+  const std::string& getPassphrase() const;
+  const std::filesystem::path& getPrivateKeyFile() const;
+  const std::filesystem::path& getCACertificate() const;
 
   void yield() override {
   }
@@ -203,7 +200,7 @@ class SSLContextService : public core::controller::ControllerService {
  protected:
   virtual void initializeProperties();
 
-  std::mutex initialization_mutex_;
+  mutable std::mutex initialization_mutex_;
   bool initialized_;
   std::filesystem::path certificate_;
   std::filesystem::path private_key_;
diff --git a/libminifi/include/utils/net/AsioCoro.h b/libminifi/include/utils/net/AsioCoro.h
new file mode 100644
index 000000000..5c2e5268b
--- /dev/null
+++ b/libminifi/include/utils/net/AsioCoro.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <tuple>
+#include <system_error>
+#include <utility>
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/steady_timer.hpp"
+#include "asio/this_coro.hpp"
+#include "asio/use_awaitable.hpp"
+#include "asio/experimental/awaitable_operators.hpp"
+#include "asio/experimental/as_tuple.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+constexpr auto use_nothrow_awaitable = asio::experimental::as_tuple(asio::use_awaitable);
+
+using HandshakeType = asio::ssl::stream_base::handshake_type;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<asio::ip::tcp::socket>;
+
+#if defined(__GNUC__) && __GNUC__ < 11
+// [coroutines] unexpected 'warning: statement has no effect [-Wunused-value]'
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96749
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-value"
+#endif  // defined(__GNUC__) && __GNUC__ < 11
+inline asio::awaitable<void> async_wait(asio::steady_timer& timer) {
+  co_await timer.async_wait(utils::net::use_nothrow_awaitable);
+}
+#if defined(__GNUC__) && __GNUC__ < 11
+#pragma GCC diagnostic pop
+#endif  // defined(__GNUC__) && __GNUC__ < 11
+
+namespace detail {
+inline asio::awaitable<void> timeout(std::chrono::steady_clock::duration duration) {
+  asio::steady_timer timer(co_await asio::this_coro::executor);  // NOLINT
+  timer.expires_after(duration);
+  co_await async_wait(timer);
+}
+}  // namespace detail
+
+template<class... Types>
+asio::awaitable<std::tuple<std::error_code, Types...>> asyncOperationWithTimeout(asio::awaitable<std::tuple<std::error_code, Types...>>&& async_operation,
+    std::chrono::steady_clock::duration timeout_duration) {
+  using asio::experimental::awaitable_operators::operator||;
+  auto operation_result = co_await(std::move(async_operation) || detail::timeout(timeout_duration));
+  if (operation_result.index() == 1) {
+    std::tuple<std::error_code, Types...> result;
+    std::get<0>(result) = asio::error::timed_out;
+    co_return result;
+  }
+  co_return std::get<0>(operation_result);
+}
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Server.h b/libminifi/include/utils/net/Server.h
index 5a6d7622d..e84815c3b 100644
--- a/libminifi/include/utils/net/Server.h
+++ b/libminifi/include/utils/net/Server.h
@@ -26,7 +26,9 @@
 #include "core/logging/Logger.h"
 #include "asio/ts/buffer.hpp"
 #include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
+#include "asio/awaitable.hpp"
+#include "asio/co_spawn.hpp"
+#include "asio/detached.hpp"
 #include "IpProtocol.h"
 
 namespace org::apache::nifi::minifi::utils::net {
@@ -50,6 +52,7 @@ struct Message {
 class Server {
  public:
   virtual void run() {
+    asio::co_spawn(io_context_, doReceive(), asio::detached);
     io_context_.run();
   }
   virtual void reset() {
@@ -68,10 +71,16 @@ class Server {
     stop();
   }
 
+  uint16_t getPort() const {
+    return port_;
+  }
+
  protected:
-  Server(std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-      : max_queue_size_(std::move(max_queue_size)), logger_(logger) {}
+  virtual asio::awaitable<void> doReceive() = 0;
+  Server(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
+      : port_(port), max_queue_size_(max_queue_size), logger_(std::move(logger)) {}
 
+  std::atomic<uint16_t> port_;
   utils::ConcurrentQueue<Message> concurrent_queue_;
   asio::io_context io_context_;
   std::optional<size_t> max_queue_size_;
diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h
deleted file mode 100644
index 173fdcb02..000000000
--- a/libminifi/include/utils/net/SessionHandlingServer.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <utility>
-#include <memory>
-
-#include "Server.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-template<typename SessionType>
-class SessionHandlingServer : public Server {
- public:
-  SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-      : Server(max_queue_size, std::move(logger)),
-        acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) {
-  }
-
-  void run() override {
-    startAccept();
-    Server::run();
-  }
-
- protected:
-  void startAccept() {
-    auto new_session = createSession();
-    acceptor_.async_accept(new_session->getSocket(),
-                           [this, new_session](const auto& error_code) -> void {
-                             handleAccept(new_session, error_code);
-                           });
-  }
-
-  void handleAccept(const std::shared_ptr<SessionType>& session, const std::error_code& error) {
-    if (error) {
-      return;
-    }
-
-    session->start();
-    auto new_session = createSession();
-    acceptor_.async_accept(new_session->getSocket(),
-                           [this, new_session](const auto& error_code) -> void {
-                             handleAccept(new_session, error_code);
-                           });
-  }
-
-  virtual std::shared_ptr<SessionType> createSession() = 0;
-
-  asio::ip::tcp::acceptor acceptor_;
-};
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Ssl.h b/libminifi/include/utils/net/Ssl.h
index 7a32e30ac..d8ea4621a 100644
--- a/libminifi/include/utils/net/Ssl.h
+++ b/libminifi/include/utils/net/Ssl.h
@@ -23,9 +23,16 @@
 #include "core/ProcessContext.h"
 #include "core/Property.h"
 #include "core/logging/Logger.h"
+#include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+SMART_ENUM(ClientAuthOption,
+    (NONE, "NONE"),
+    (WANT, "WANT"),
+    (REQUIRED, "REQUIRED")
+)
+
 struct SslData {
   std::filesystem::path ca_loc;
   std::filesystem::path cert_loc;
@@ -37,6 +44,15 @@ struct SslData {
   }
 };
 
+struct SslServerOptions {
+  SslData cert_data;
+  ClientAuthOption client_auth_option;
+
+  SslServerOptions(SslData cert_data, ClientAuthOption client_auth_option)
+      : cert_data(cert_data),
+      client_auth_option(client_auth_option) {}
+};
+
 std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::Property& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger);
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/SslServer.h b/libminifi/include/utils/net/SslServer.h
deleted file mode 100644
index ddd04e773..000000000
--- a/libminifi/include/utils/net/SslServer.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "SessionHandlingServer.h"
-
-#include <memory>
-#include <string>
-
-#include "Ssl.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-using ssl_socket = asio::ssl::stream<asio::ip::tcp::socket>;
-
-class SslSession : public std::enable_shared_from_this<SslSession> {
- public:
-  SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
-    std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
-
-  ssl_socket::lowest_layer_type& getSocket();
-  void start();
-  void handleReadUntilNewLine(std::error_code error_code);
-
- protected:
-  utils::ConcurrentQueue<Message>& concurrent_queue_;
-  std::optional<size_t> max_queue_size_;
-  asio::basic_streambuf<std::allocator<char>> buffer_;
-  std::shared_ptr<core::logging::Logger> logger_;
-  ssl_socket socket_;
-};
-
-class SslServer : public SessionHandlingServer<SslSession> {
- public:
-  SMART_ENUM(ClientAuthOption,
-    (NONE, "NONE"),
-    (WANT, "WANT"),
-    (REQUIRED, "REQUIRED")
-  )
-
-  SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth);
-
- protected:
-  std::shared_ptr<SslSession> createSession() override;
-
-  asio::ssl::context context_;
-  SslData ssl_data_;
-};
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h
index a70b26bd9..717d674ed 100644
--- a/libminifi/include/utils/net/TcpServer.h
+++ b/libminifi/include/utils/net/TcpServer.h
@@ -16,36 +16,30 @@
  */
 #pragma once
 
-#include <optional>
+#include <utility>
 #include <memory>
 
-#include "SessionHandlingServer.h"
+#include "Server.h"
+#include "Ssl.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-class TcpSession : public std::enable_shared_from_this<TcpSession> {
+class TcpServer : public Server {
  public:
-  TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
-  asio::ip::tcp::socket& getSocket();
-  void start();
-  void handleReadUntilNewLine(std::error_code error_code);
-
- private:
-  utils::ConcurrentQueue<Message>& concurrent_queue_;
-  std::optional<size_t> max_queue_size_;
-  asio::basic_streambuf<std::allocator<char>> buffer_;
-  asio::ip::tcp::socket socket_;
-  std::shared_ptr<core::logging::Logger> logger_;
-};
-
-class TcpServer : public SessionHandlingServer<TcpSession> {
- public:
-  TcpServer(std::optional<size_t> max_queue_size,
-            uint16_t port,
-            std::shared_ptr<core::logging::Logger> logger);
+  TcpServer(std::optional<size_t> max_queue_size_, uint16_t port, std::shared_ptr<core::logging::Logger> logger, std::optional<SslServerOptions> ssl_data)
+      : Server(max_queue_size_, port, std::move(logger)),
+        ssl_data_(std::move(ssl_data)) {
+  }
 
  protected:
-  std::shared_ptr<TcpSession> createSession() override;
+  asio::awaitable<void> doReceive() override;
+
+  asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket);
+  asio::awaitable<void> secureSession(asio::ip::tcp::socket socket);
+
+  asio::awaitable<void> readLoop(auto& socket);
+
+  std::optional<SslServerOptions> ssl_data_;
 };
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/UdpServer.h b/libminifi/include/utils/net/UdpServer.h
index e9b852b00..97cfa5737 100644
--- a/libminifi/include/utils/net/UdpServer.h
+++ b/libminifi/include/utils/net/UdpServer.h
@@ -19,14 +19,12 @@
 #include <optional>
 #include <memory>
 #include <string>
+#include <asio/awaitable.hpp>
 
 #include "Server.h"
 #include "utils/MinifiConcurrentQueue.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "asio/ts/buffer.hpp"
-#include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
 
 namespace org::apache::nifi::minifi::utils::net {
 
@@ -37,13 +35,7 @@ class UdpServer : public Server {
             std::shared_ptr<core::logging::Logger> logger);
 
  private:
-  void doReceive();
-
-  asio::ip::udp::socket socket_;
-  asio::ip::udp::endpoint sender_endpoint_;
-  std::string buffer_;
-
-  static constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+  asio::awaitable<void> doReceive() override;
 };
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 165805d86..635e95144 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -31,13 +31,11 @@
 #include <fstream>
 #include <memory>
 #include <string>
-#include <set>
 
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "io/validation.h"
 #include "properties/Configure.h"
-#include "utils/gsl.h"
 #include "utils/tls/CertificateUtils.h"
 #include "utils/tls/TLSUtils.h"
 #include "utils/tls/DistinguishedName.h"
@@ -407,22 +405,22 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
 #endif
 }
 
-const std::filesystem::path &SSLContextService::getCertificateFile() {
+const std::filesystem::path &SSLContextService::getCertificateFile() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return certificate_;
 }
 
-const std::string &SSLContextService::getPassphrase() {
+const std::string &SSLContextService::getPassphrase() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return passphrase_;
 }
 
-const std::filesystem::path &SSLContextService::getPrivateKeyFile() {
+const std::filesystem::path &SSLContextService::getPrivateKeyFile() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return private_key_;
 }
 
-const std::filesystem::path &SSLContextService::getCACertificate() {
+const std::filesystem::path &SSLContextService::getCACertificate() const {
   std::lock_guard<std::mutex> lock(initialization_mutex_);
   return ca_certificate_;
 }
diff --git a/libminifi/src/utils/net/SslServer.cpp b/libminifi/src/utils/net/SslServer.cpp
deleted file mode 100644
index 7aaf4845a..000000000
--- a/libminifi/src/utils/net/SslServer.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "utils/net/SslServer.h"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-SslSession::SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
-    std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    logger_(std::move(logger)),
-    socket_(io_context, context) {
-}
-
-ssl_socket::lowest_layer_type& SslSession::getSocket() {
-  return socket_.lowest_layer();
-}
-
-void SslSession::start() {
-  socket_.async_handshake(asio::ssl::stream_base::server,
-    [this, self = shared_from_this()](const std::error_code& error_code) {
-      if (error_code) {
-        logger_->log_error("Error occured during SSL handshake: (%d) %s", error_code.value(), error_code.message());
-        return;
-      }
-      asio::async_read_until(socket_,
-                             buffer_,
-                             '\n',
-                             [self](const auto& error_code, size_t) -> void {
-                               self->handleReadUntilNewLine(error_code);
-                             });
-    });
-}
-
-void SslSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, getSocket().remote_endpoint().address(), getSocket().local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
-}
-
-SslServer::SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth)
-    : SessionHandlingServer<SslSession>(max_queue_size, port, std::move(logger)),
-      context_(asio::ssl::context::sslv23),
-      ssl_data_(std::move(ssl_data)) {
-    context_.set_options(
-        asio::ssl::context::default_workarounds
-        | asio::ssl::context::no_sslv2
-        | asio::ssl::context::single_dh_use);
-    context_.set_password_callback([this](std::size_t&, asio::ssl::context_base::password_purpose&) { return ssl_data_.key_pw; });
-    context_.use_certificate_file(ssl_data_.cert_loc.string(), asio::ssl::context::pem);
-    context_.use_private_key_file(ssl_data_.key_loc.string(), asio::ssl::context::pem);
-    context_.load_verify_file(ssl_data_.ca_loc.string());
-    if (client_auth == ClientAuthOption::REQUIRED) {
-      context_.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
-    } else if (client_auth == ClientAuthOption::WANT) {
-      context_.set_verify_mode(asio::ssl::verify_peer);
-    }
-}
-
-std::shared_ptr<SslSession> SslServer::createSession() {
-  return std::make_shared<SslSession>(io_context_, context_, concurrent_queue_, max_queue_size_, logger_);
-}
-
-}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp
index 742ef7f25..fcd02d1e4 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -15,53 +15,73 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. TCP message ignored.");
+    read_message.erase(0, bytes_read);
+  }
 }
 
-void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) {
+  co_return co_await readLoop(socket);  // NOLINT
 }
 
-TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-    : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
+namespace {
+asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
+  asio::ssl::context ssl_context(asio::ssl::context::tls_server);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+  ssl_context.set_password_callback([key_pw = ssl_data.cert_data.key_pw](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; });
+  ssl_context.use_certificate_file(ssl_data.cert_data.cert_loc.string(), asio::ssl::context::pem);
+  ssl_context.use_private_key_file(ssl_data.cert_data.key_loc.string(), asio::ssl::context::pem);
+  ssl_context.load_verify_file(ssl_data.cert_data.ca_loc.string());
+  if (ssl_data.client_auth_option == ClientAuthOption::REQUIRED) {
+    ssl_context.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
+  } else if (ssl_data.client_auth_option == ClientAuthOption::WANT) {
+    ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  }
+  return ssl_context;
 }
+}  // namespace
 
-std::shared_ptr<TcpSession> TcpServer::createSession() {
-  return std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_);
+asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket) {
+  gsl_Expects(ssl_data_);
+  auto ssl_context = setupSslContext(*ssl_data_);
+  SslSocket ssl_socket(std::move(socket), ssl_context);
+  auto [handshake_error] = co_await ssl_socket.async_handshake(HandshakeType::server, use_nothrow_awaitable);
+  if (handshake_error) {
+    core::logging::LOG_WARN(logger_) << "Handshake with " << ssl_socket.lowest_layer().remote_endpoint() << " failed due to " << handshake_error.message();
+    co_return;
+  }
+  co_return co_await readLoop(ssl_socket);  // NOLINT
 }
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
index 6137f2ddf..e6179db13 100644
--- a/libminifi/src/utils/net/UdpServer.cpp
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -15,32 +15,39 @@
  * limitations under the License.
  */
 #include "utils/net/UdpServer.h"
+#include "asio/use_awaitable.hpp"
+#include "asio/detached.hpp"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+
 UdpServer::UdpServer(std::optional<size_t> max_queue_size,
                      uint16_t port,
                      std::shared_ptr<core::logging::Logger> logger)
-    : Server(max_queue_size, std::move(logger)),
-      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
-  doReceive();
+    : Server(max_queue_size, port, std::move(logger)) {
 }
 
+asio::awaitable<void> UdpServer::doReceive() {
+  asio::ip::udp::socket socket(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port_));
+  if (port_ == 0)
+    port_ = socket.local_endpoint().port();
+  while (true) {
+    std::string buffer = std::string(MAX_UDP_PACKET_SIZE, {});
+    asio::ip::udp::endpoint sender_endpoint;
 
-void UdpServer::doReceive() {
-  buffer_.resize(MAX_UDP_PACKET_SIZE);
-  socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
-                             sender_endpoint_,
-                             [this](std::error_code ec, std::size_t bytes_received) {
-                               if (!ec && bytes_received > 0) {
-                                 buffer_.resize(bytes_received);
-                                 if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-                                   concurrent_queue_.enqueue(utils::net::Message(std::move(buffer_), IpProtocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
-                                 else
-                                   logger_->log_warn("Queue is full. UDP message ignored.");
-                               }
-                               doReceive();
-                             });
+    auto [receive_error, bytes_received] = co_await socket.async_receive_from(asio::buffer(buffer, MAX_UDP_PACKET_SIZE), sender_endpoint, utils::net::use_nothrow_awaitable);
+    if (receive_error) {
+      logger_->log_warn("Error during receive: %s", receive_error.message());
+      continue;
+    }
+    buffer.resize(bytes_received);
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. UDP message ignored.");
+  }
 }
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/Catch.h b/libminifi/test/Catch.h
index 925f9dc29..755496feb 100644
--- a/libminifi/test/Catch.h
+++ b/libminifi/test/Catch.h
@@ -20,15 +20,15 @@
 #define CATCH_CONFIG_FAST_COMPILE
 #include <optional>
 #include <string>
+#include "spdlog/spdlog.h"
 #include "catch.hpp"
 
-
 namespace Catch {
 template<typename T>
 struct StringMaker<std::optional<T>> {
   static std::string convert(const std::optional<T>& val) {
     if (val) {
-      return "std::optional(" + StringMaker<T>::convert(val.value()) + ")";
+      return fmt::format("std::optional({})", StringMaker<T>::convert(val.value()));
     }
     return "std::nullopt";
   }
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(category:{}, value:{}, message:{})", error_code.category().name(), error_code.value(), error_code.message());
+  }
+};
 }  // namespace Catch
+
+namespace org::apache::nifi::minifi::test {
+struct MatchesSuccess : Catch::MatcherBase<std::error_code> {
+  MatchesSuccess() = default;
+
+  bool match(const std::error_code& err) const override {
+    return err.value() == 0;
+  }
+
+  std::string describe() const override {
+    return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+};
+
+struct MatchesError : Catch::MatcherBase<std::error_code> {
+  explicit MatchesError(std::optional<std::error_code> expected_error = std::nullopt)
+      : Catch::MatcherBase<std::error_code>(),
+        expected_error_(expected_error) {
+  }
+
+  bool match(const std::error_code& err) const override {
+    if (expected_error_)
+      return err == *expected_error_;
+    return err.value() != 0;
+  }
+
+  std::string describe() const override {
+    if (expected_error_)
+      return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(*expected_error_));
+    return fmt::format("!= {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+ private:
+  std::optional<std::error_code> expected_error_;
+};
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index bb06b21b7..4f37b399e 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -19,11 +19,13 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <memory>
 
 #include "rapidjson/document.h"
 #include "asio.hpp"
 #include "asio/ssl.hpp"
 #include "net/Ssl.h"
+#include "utils/IntegrationTestUtils.h"
 
 using namespace std::chrono_literals;
 
@@ -111,36 +113,39 @@ bool countLogOccurrencesUntil(const std::string& pattern,
   return false;
 }
 
-bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
+std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::tcp::socket socket(io_context);
-  socket.connect(remote_endpoint);
   std::error_code err;
+  socket.connect(remote_endpoint, err);
+  if (err)
+    return err;
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
-    if (err) {
-      return false;
-    }
+    if (err)
+      return err;
   }
-  return true;
+  return std::error_code();
 }
 
-bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::udp::socket socket(io_context);
-  socket.open(remote_endpoint.protocol());
   std::error_code err;
+  socket.open(remote_endpoint.protocol(), err);
+  if (err)
+    return err;
   socket.send_to(content, remote_endpoint, 0, err);
-  return !err;
+  return err;
 }
 
-bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
   return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint);
 }
 
-bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
   return sendUdpDatagram(asio::buffer(content), remote_endpoint);
 }
 
@@ -166,11 +171,12 @@ struct FlowFileQueueTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
 
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
-                        const asio::ip::tcp::endpoint& remote_endpoint,
-                        const std::filesystem::path& ca_cert_path,
-                        const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
-  asio::ssl::context ctx(asio::ssl::context::sslv23);
+std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+    const asio::ip::tcp::endpoint& remote_endpoint,
+    const std::filesystem::path& ca_cert_path,
+    const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt,
+    asio::ssl::context::method method = asio::ssl::context::tlsv12_client) {
+  asio::ssl::context ctx(method);
   ctx.load_verify_file(ca_cert_path.string());
   if (ssl_data) {
     ctx.set_verify_mode(asio::ssl::verify_peer);
@@ -183,33 +189,48 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
+    && requires(T x) {
+      {T::Port} -> std::convertible_to<core::Property>;
+      {x.getPort()} -> std::convertible_to<uint16_t>;
+    };  // NOLINT(readability/braces)
+
+template<NetworkingProcessor T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
+  REQUIRE(processor->setProperty(T::Port, "0"));
+  test_plan->scheduleProcessor(processor);
+  REQUIRE(minifi::utils::verifyEventHappenedInPollTime(250ms, [&processor] { return processor->getPort() != 0; }, 20ms));
+  return processor->getPort();
+}
+
 }  // namespace org::apache::nifi::minifi::test::utils