You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/06 10:53:21 UTC

[nifi-minifi-cpp] 01/04: MINIFICPP-1923 Refactor PutUDP to use asio

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit bf5d3d25ba33a7e02eb9bee98767073f08e86464
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Oct 5 16:45:25 2022 +0200

    MINIFICPP-1923 Refactor PutUDP to use asio
    
    MINIFICPP-1939 Enable dual stack listening on ListenTCP and ListenSyslog
    PutUDP should iterate through endpoints (both ipv4 and ipv6)
    
    Closes #1412
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../standard-processors/processors/PutUDP.cpp      |  94 +++++++--------
 extensions/standard-processors/processors/PutUDP.h |   2 -
 .../tests/unit/ListenSyslogTests.cpp               | 131 +++++++++++++++------
 .../tests/unit/ListenTcpTests.cpp                  |  78 ++++++++++--
 .../standard-processors/tests/unit/PutUDPTests.cpp | 118 ++++++++++---------
 .../include/utils/net/SessionHandlingServer.h      |   2 +-
 libminifi/src/utils/net/UdpServer.cpp              |   2 +-
 libminifi/test/Utils.h                             |  48 ++++++--
 8 files changed, 307 insertions(+), 168 deletions(-)

diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index 95000011d..56af0f306 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -16,29 +16,20 @@
  */
 #include "PutUDP.h"
 
-#ifdef WIN32
-#ifndef WIN32_LEAN_AND_MEAN
-#define WIN32_LEAN_AND_MEAN
-#endif
-#include <winsock2.h>
-#else
-#include <netdb.h>
-#endif /* WIN32 */
-#include <utility>
-
-#include "range/v3/view/join.hpp"
 #include "range/v3/range/conversion.hpp"
 
 #include "utils/gsl.h"
 #include "utils/expected.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 
+#include "asio/ip/udp.hpp"
+
+using asio::ip::udp;
+
 namespace org::apache::nifi::minifi::processors {
 
 const core::Property PutUDP::Hostname = core::PropertyBuilder::createProperty("Hostname")
@@ -107,51 +98,54 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return;
   }
 
-  const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> std::string {
-    return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); }).value_or("(n/a)");
+  asio::io_context io_context;
+
+  const auto resolve_hostname = [&io_context, &hostname, &port]() -> nonstd::expected<udp::resolver::results_type, std::error_code> {
+    udp::resolver resolver(io_context);
+    std::error_code error_code;
+    auto results = resolver.resolve(hostname, port, error_code);
+    if (error_code)
+      return nonstd::make_unexpected(error_code);
+    return results;
   };
 
-  const auto debug_log_resolved_names = [&, this](const addrinfo& names) -> decltype(auto) {
-    if (logger_->should_log(core::logging::LOG_LEVEL::debug)) {
-      std::vector<std::string> names_vector;
-      for (const addrinfo* it = &names; it; it = it->ai_next) {
-        names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr));
+  const auto send_data_to_endpoint = [&io_context, &data, &logger = this->logger_](const udp::resolver::results_type& resolved_query) -> nonstd::expected<void, std::error_code> {
+    std::error_code error;
+    for (const auto& resolver_entry : resolved_query) {
+      error.clear();
+      udp::socket socket(io_context);
+      socket.open(resolver_entry.endpoint().protocol(), error);
+      if (error) {
+        logger->log_debug("opening %s socket failed due to %s ", resolver_entry.endpoint().protocol() == udp::v4() ? "IPv4" : "IPv6", error.message());
+        continue;
       }
-      logger_->log_debug("resolved \'%s\' to: %s",
-          hostname,
-          names_vector | ranges::views::join(',') | ranges::to<std::string>());
+      socket.send_to(asio::buffer(data.buffer), resolver_entry.endpoint(), udp::socket::message_flags{}, error);
+      if (error) {
+        core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " failed due to " << error.message();
+        continue;
+      }
+      core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " succeeded";
+      return {};
     }
-    return names;
+    return nonstd::make_unexpected(error);
+  };
+
+  const auto transfer_to_success = [&session, &flow_file]() -> void {
+    session->transfer(flow_file, Success);
   };
 
-  utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::UDP)
-      | utils::map(utils::dereference)
-      | utils::map(debug_log_resolved_names)
-      | utils::flatMap([](const auto& names) { return utils::net::open_socket(names); })
-      | utils::flatMap([&, this](utils::net::OpenSocketResult socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> {
-        const auto& [socket_handle, selected_name] = socket_handle_and_selected_name;
-        logger_->log_debug("connected to %s", nonthrowing_sockaddr_ntop(selected_name->ai_addr));
-#ifdef WIN32
-        const char* const buffer_ptr = reinterpret_cast<const char*>(data.buffer.data());
-#else
-        const void* const buffer_ptr = data.buffer.data();
-#endif
-        const auto send_result = ::sendto(socket_handle.get(), buffer_ptr, data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen);
-        logger_->log_trace("sendto returned %ld", static_cast<long>(send_result));  // NOLINT: sendto
-        if (send_result == utils::net::SocketError) {
-          return nonstd::make_unexpected(utils::net::get_last_socket_error());
-        }
-        session->transfer(flow_file, Success);
-        return {};
-      })
-      | utils::orElse([&, this](std::error_code ec) {
-        gsl_Expects(ec);
-        logger_->log_error("%s", ec.message());
-        session->transfer(flow_file, Failure);
-      });
+  const auto transfer_to_failure = [&session, &flow_file, &logger = this->logger_](std::error_code ec) -> void {
+    gsl_Expects(ec);
+    logger->log_error("%s", ec.message());
+    session->transfer(flow_file, Failure);
+  };
+
+  resolve_hostname()
+      | utils::flatMap(send_data_to_endpoint)
+      | utils::map(transfer_to_success)
+      | utils::orElse(transfer_to_failure);
 }
 
 REGISTER_RESOURCE(PutUDP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
-
diff --git a/extensions/standard-processors/processors/PutUDP.h b/extensions/standard-processors/processors/PutUDP.h
index 5217519fb..cf69fce3d 100644
--- a/extensions/standard-processors/processors/PutUDP.h
+++ b/extensions/standard-processors/processors/PutUDP.h
@@ -58,8 +58,6 @@ class PutUDP final : public core::Processor {
   void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
 
  private:
-  std::string hostname_;
-  std::string port_;
   std::shared_ptr<core::logging::Logger> logger_;
 };
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index c9b8e472e..302af6c3a 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -21,6 +21,7 @@
 #include "SingleProcessorTestController.h"
 #include "Utils.h"
 #include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
 
 using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog;
 
@@ -29,6 +30,7 @@ using namespace std::literals::chrono_literals;
 namespace org::apache::nifi::minifi::test {
 
 constexpr uint64_t SYSLOG_PORT = 10255;
+constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
 
 struct ValidRFC5424Message {
   constexpr ValidRFC5424Message(std::string_view message,
@@ -197,21 +199,10 @@ constexpr std::string_view rfc5424_logger_example_1 = R"(<13>1 2022-03-17T10:10:
 
 constexpr std::string_view invalid_syslog = "not syslog";
 
-void sendUDPPacket(const std::string_view content, uint64_t port) {
-  asio::io_context io_context;
-  asio::ip::udp::socket socket(io_context);
-  asio::ip::udp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
-  socket.open(asio::ip::udp::v4());
-  std::error_code err;
-  socket.send_to(asio::buffer(content, content.size()), remote_endpoint, 0, err);
-  REQUIRE(!err);
-  socket.close();
-}
-
 void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, std::string_view protocol) {
   CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
   CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
-  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
 
   CHECK(std::nullopt == flow_file.getAttribute("syslog.valid"));
   CHECK(std::nullopt == flow_file.getAttribute("syslog.priority"));
@@ -228,7 +219,7 @@ void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, s
 void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424Message& original_message, uint16_t port, std::string_view protocol) {
   CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
   CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
-  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
 
   CHECK("true" == flow_file.getAttribute("syslog.valid"));
   CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
@@ -247,7 +238,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424
 void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164Message& original_message, uint16_t port, std::string_view protocol) {
   CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
   CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
-  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
 
   CHECK("true" == flow_file.getAttribute("syslog.valid"));
   CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
@@ -269,19 +260,37 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   std::string protocol;
 
   SECTION("UDP") {
+    asio::ip::udp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     protocol = "UDP";
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
     controller.plan->scheduleProcessor(listen_syslog);
-    sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
-    sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
+    utils::sendUdpDatagram(invalid_syslog, endpoint);
   }
 
   SECTION("TCP") {
+    asio::ip::tcp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     protocol = "TCP";
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
     controller.plan->scheduleProcessor(listen_syslog);
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT));
+    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
+    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
   }
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
@@ -303,25 +312,43 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
 
   std::string protocol;
   SECTION("UDP") {
+    asio::ip::udp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     protocol = "UDP";
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
     controller.plan->scheduleProcessor(listen_syslog);
     std::this_thread::sleep_for(100ms);
-    sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc5424_doc_example_2.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc5424_doc_example_3.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc5424_doc_example_4.unparsed_, SYSLOG_PORT);
-
-    sendUDPPacket(rfc3164_doc_example_1.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc3164_doc_example_2.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc3164_doc_example_3.unparsed_, SYSLOG_PORT);
-    sendUDPPacket(rfc3164_doc_example_4.unparsed_, SYSLOG_PORT);
-
-    sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
-    sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+    utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint);
+
+    utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint);
+    utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint);
+
+    utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
+    utils::sendUdpDatagram(invalid_syslog, endpoint);
   }
 
   SECTION("TCP") {
+    asio::ip::tcp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     protocol = "TCP";
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
     controller.plan->scheduleProcessor(listen_syslog);
@@ -329,15 +356,15 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
     REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
                                        rfc5424_doc_example_2.unparsed_,
                                        rfc5424_doc_example_3.unparsed_,
-                                       rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT));
+                                       rfc5424_doc_example_4.unparsed_}, endpoint));
 
     REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
                                        rfc3164_doc_example_2.unparsed_,
                                        rfc3164_doc_example_3.unparsed_,
-                                       rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT));
+                                       rfc3164_doc_example_4.unparsed_}, endpoint));
 
-    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT));
-    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT));
+    REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
+    REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
   }
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
@@ -410,19 +437,37 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
   LogTestController::getInstance().setWarn<ListenSyslog>();
 
   SECTION("UDP") {
+    asio::ip::udp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
     controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
+      utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
   }
 
   SECTION("TCP") {
+    asio::ip::tcp::endpoint endpoint;
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+    }
     REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
     controller.plan->scheduleProcessor(listen_syslog);
     for (auto i = 0; i < 100; ++i) {
-      REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT));
+      REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint));
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
   }
@@ -435,6 +480,15 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
 }
 
 TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+  }
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -454,9 +508,8 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
   ssl_context_service->enable();
   controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
-
+  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 559db0c2f..247f2e6ff 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -21,6 +21,7 @@
 #include "SingleProcessorTestController.h"
 #include "Utils.h"
 #include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
 
 using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
 
@@ -32,10 +33,20 @@ constexpr uint64_t PORT = 10254;
 
 void check_for_attributes(core::FlowFile& flow_file) {
   CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
-  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
 }
 
 TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
 
   SingleProcessorTestController controller{listen_tcp};
@@ -44,8 +55,8 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
   REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
 
   controller.plan->scheduleProcessor(listen_tcp);
-  REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, PORT));
-  REQUIRE(utils::sendMessagesViaTCP({"another_message"}, PORT));
+  REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint));
+  REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint));
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
@@ -68,6 +79,15 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
 }
 
 TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
 
   SingleProcessorTestController controller{listen_tcp};
@@ -79,7 +99,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
 
   controller.plan->scheduleProcessor(listen_tcp);
   for (auto i = 0; i < 100; ++i) {
-    REQUIRE(utils::sendMessagesViaTCP({"test_message"}, PORT));
+    REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint));
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
@@ -110,27 +130,61 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
   std::vector<std::string> expected_successful_messages;
 
+  asio::ip::tcp::endpoint endpoint;
+
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      SECTION("sending through IPv4", "[IPv4]") {
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      }
+      SECTION("sending through IPv6", "[IPv6]") {
+        if (utils::isIPv6Disabled())
+          return;
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      SECTION("sending through IPv4", "[IPv4]") {
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      }
+      SECTION("sending through IPv6", "[IPv6]") {
+        if (utils::isIPv6Disabled())
+          return;
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      }
     }
     ssl_context_service->enable();
     controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
-    for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+    for (const auto& message: expected_successful_messages) {
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
     }
   }
 
   SECTION("With client certificate provided") {
     SECTION("Client certificate required") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+      SECTION("sending through IPv4", "[IPv4]") {
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      }
+      SECTION("sending through IPv6", "[IPv6]") {
+        if (utils::isIPv6Disabled())
+          return;
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      }
     }
     SECTION("Client certificate not required but validated") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      SECTION("sending through IPv4", "[IPv4]") {
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      }
+      SECTION("sending through IPv6", "[IPv6]") {
+        if (utils::isIPv6Disabled())
+          return;
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      }
     }
     ssl_context_service->enable();
     controller.plan->scheduleProcessor(listen_tcp);
@@ -143,16 +197,24 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data));
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data));
     }
   }
 
   SECTION("Required certificate not provided") {
+    SECTION("sending through IPv4", "[IPv4]") {
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+    }
+    SECTION("sending through IPv6", "[IPv6]") {
+      if (utils::isIPv6Disabled())
+        return;
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+    }
     REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
     ssl_context_service->enable();
     controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
   }
 
   ProcessorTriggerResult result;
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 9e8e95247..fd313d928 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -24,78 +24,59 @@
 #include "Catch.h"
 #include "PutUDP.h"
 #include "core/ProcessContext.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
+#include "utils/net/UdpServer.h"
 #include "utils/expected.h"
 #include "utils/StringUtils.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace org::apache::nifi::minifi::processors {
 
 namespace {
-struct DatagramListener {
-  DatagramListener(const char* const hostname, const char* const port)
-    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::UDP).value()},
-     open_socket_{utils::net::open_socket(*resolved_names_)
-        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
-  {
-    const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
-    if (bind_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
-    }
-  }
-
-  struct ReceiveResult {
-    std::string remote_address;
-    std::string message;
-  };
-
-  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
-    ReceiveResult result;
-    result.message.resize(max_message_size);
-    sockaddr_storage remote_address{};
-    socklen_t addrlen = sizeof(remote_address);
-    const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
-    if (recv_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())};
-    }
-    result.message.resize(gsl::narrow<size_t>(recv_result));
-    result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
-    return result;
+std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) {
+  auto start_time = std::chrono::system_clock::now();
+  utils::net::Message result;
+  while (start_time + timeout > std::chrono::system_clock::now()) {
+    if (listener.tryDequeue(result))
+      return result;
+    std::this_thread::sleep_for(interval);
   }
-
-  std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
-  utils::net::OpenSocketResult open_socket_;
-};
+  return std::nullopt;
+}
 }  // namespace
 
-// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is
-// some weird IO error with the content repo.
 TEST_CASE("PutUDP", "[putudp]") {
-  const auto putudp = std::make_shared<PutUDP>("PutUDP");
+  const auto put_udp = std::make_shared<PutUDP>("PutUDP");
   auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
   // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
   const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
-  const auto port_str = std::to_string(port);
 
-  test::SingleProcessorTestController controller{putudp};
+  test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
-  LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
-  putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+  put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  DatagramListener listener{"localhost", port_str.c_str()};
+  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+
+  auto server_thread = std::thread([&listener]() { listener.run(); });
+  auto cleanup_server = gsl::finally([&]{
+    listener.stop();
+    server_thread.join();
+  });
 
   {
     const char* const message = "first message: hello";
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
 
   {
@@ -103,12 +84,39 @@ TEST_CASE("PutUDP", "[putudp]") {
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
-}
 
+  {
+    LogTestController::getInstance().clear();
+    auto message = std::string(65536, 'a');
+    const auto result = controller.trigger(message);
+    const auto& failure_flow_files = result.at(PutUDP::Failure);
+    REQUIRE(failure_flow_files.size() == 1);
+    CHECK(result.at(PutUDP::Success).empty());
+    CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+    CHECK((LogTestController::getInstance().contains("Message too long")
+        || LogTestController::getInstance().contains("A message sent on a datagram socket was larger than the internal message buffer")));
+  }
+
+  {
+    LogTestController::getInstance().clear();
+    const char* const message = "message for invalid host";
+    controller.plan->setProperty(put_udp, PutUDP::Hostname.getName(), "invalid_hostname");
+    const auto result = controller.trigger(message);
+    const auto& failure_flow_files = result.at(PutUDP::Failure);
+    auto received_message = tryDequeueWithTimeout(listener);
+    CHECK(!received_message);
+    REQUIRE(failure_flow_files.size() == 1);
+    CHECK(result.at(PutUDP::Success).empty());
+    CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+    CHECK((LogTestController::getInstance().contains("Host not found") || LogTestController::getInstance().contains("No such host is known")));
+  }
+}
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h
index f716a8758..173fdcb02 100644
--- a/libminifi/include/utils/net/SessionHandlingServer.h
+++ b/libminifi/include/utils/net/SessionHandlingServer.h
@@ -29,7 +29,7 @@ class SessionHandlingServer : public Server {
  public:
   SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
       : Server(max_queue_size, std::move(logger)),
-        acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) {
+        acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) {
   }
 
   void run() override {
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
index 490a081e7..6137f2ddf 100644
--- a/libminifi/src/utils/net/UdpServer.cpp
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -22,7 +22,7 @@ UdpServer::UdpServer(std::optional<size_t> max_queue_size,
                      uint16_t port,
                      std::shared_ptr<core::logging::Logger> logger)
     : Server(max_queue_size, std::move(logger)),
-      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) {
+      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
   doReceive();
 }
 
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 96389f740..76d8cc533 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -111,24 +111,47 @@ bool countLogOccurrencesUntil(const std::string& pattern,
   return false;
 }
 
-bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) {
+bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::tcp::socket socket(io_context);
-  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
   socket.connect(remote_endpoint);
   std::error_code err;
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
+    if (err) {
+      return false;
+    }
   }
-  if (err) {
-    return false;
-  }
-  socket.close();
   return true;
 }
 
+bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
+  asio::io_context io_context;
+  asio::ip::udp::socket socket(io_context);
+  socket.open(remote_endpoint.protocol());
+  std::error_code err;
+  socket.send_to(content, remote_endpoint, 0, err);
+  return !err;
+}
+
+bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
+  return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint);
+}
+
+bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
+  return sendUdpDatagram(asio::buffer(content), remote_endpoint);
+}
+
+bool isIPv6Disabled() {
+  asio::io_context io_context;
+  std::error_code error_code;
+  asio::ip::tcp::socket socket_tcp(io_context);
+  socket_tcp.connect(asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), 10), error_code);
+  return error_code.value() == EADDRNOTAVAIL;
+}
+
 struct ConnectionTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
@@ -143,7 +166,10 @@ struct FlowFileQueueTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
 
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t port, const std::string& ca_cert_path, const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
+bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+                        const asio::ip::tcp::endpoint& remote_endpoint,
+                        const std::string& ca_cert_path,
+                        const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
   asio::ssl::context ctx(asio::ssl::context::sslv23);
   ctx.load_verify_file(ca_cert_path);
   if (ssl_data) {
@@ -154,7 +180,6 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t
   }
   asio::io_context io_context;
   asio::ssl::stream<asio::ip::tcp::socket> socket(io_context, ctx);
-  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
@@ -168,11 +193,10 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
+    if (err) {
+      return false;
+    }
   }
-  if (err) {
-    return false;
-  }
-  socket.lowest_layer().close();
   return true;
 }