You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/06 10:53:21 UTC
[nifi-minifi-cpp] 01/04: MINIFICPP-1923 Refactor PutUDP to use asio
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bf5d3d25ba33a7e02eb9bee98767073f08e86464
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Oct 5 16:45:25 2022 +0200
MINIFICPP-1923 Refactor PutUDP to use asio
MINIFICPP-1939 Enable dual stack listening on ListenTCP and ListenSyslog
PutUDP should iterate through endpoints (both ipv4 and ipv6)
Closes #1412
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../standard-processors/processors/PutUDP.cpp | 94 +++++++--------
extensions/standard-processors/processors/PutUDP.h | 2 -
.../tests/unit/ListenSyslogTests.cpp | 131 +++++++++++++++------
.../tests/unit/ListenTcpTests.cpp | 78 ++++++++++--
.../standard-processors/tests/unit/PutUDPTests.cpp | 118 ++++++++++---------
.../include/utils/net/SessionHandlingServer.h | 2 +-
libminifi/src/utils/net/UdpServer.cpp | 2 +-
libminifi/test/Utils.h | 48 ++++++--
8 files changed, 307 insertions(+), 168 deletions(-)
diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index 95000011d..56af0f306 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -16,29 +16,20 @@
*/
#include "PutUDP.h"
-#ifdef WIN32
-#ifndef WIN32_LEAN_AND_MEAN
-#define WIN32_LEAN_AND_MEAN
-#endif
-#include <winsock2.h>
-#else
-#include <netdb.h>
-#endif /* WIN32 */
-#include <utility>
-
-#include "range/v3/view/join.hpp"
#include "range/v3/range/conversion.hpp"
#include "utils/gsl.h"
#include "utils/expected.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
+#include "asio/ip/udp.hpp"
+
+using asio::ip::udp;
+
namespace org::apache::nifi::minifi::processors {
const core::Property PutUDP::Hostname = core::PropertyBuilder::createProperty("Hostname")
@@ -107,51 +98,54 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
return;
}
- const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> std::string {
- return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); }).value_or("(n/a)");
+ asio::io_context io_context;
+
+ const auto resolve_hostname = [&io_context, &hostname, &port]() -> nonstd::expected<udp::resolver::results_type, std::error_code> {
+ udp::resolver resolver(io_context);
+ std::error_code error_code;
+ auto results = resolver.resolve(hostname, port, error_code);
+ if (error_code)
+ return nonstd::make_unexpected(error_code);
+ return results;
};
- const auto debug_log_resolved_names = [&, this](const addrinfo& names) -> decltype(auto) {
- if (logger_->should_log(core::logging::LOG_LEVEL::debug)) {
- std::vector<std::string> names_vector;
- for (const addrinfo* it = &names; it; it = it->ai_next) {
- names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr));
+ const auto send_data_to_endpoint = [&io_context, &data, &logger = this->logger_](const udp::resolver::results_type& resolved_query) -> nonstd::expected<void, std::error_code> {
+ std::error_code error;
+ for (const auto& resolver_entry : resolved_query) {
+ error.clear();
+ udp::socket socket(io_context);
+ socket.open(resolver_entry.endpoint().protocol(), error);
+ if (error) {
+ logger->log_debug("opening %s socket failed due to %s ", resolver_entry.endpoint().protocol() == udp::v4() ? "IPv4" : "IPv6", error.message());
+ continue;
}
- logger_->log_debug("resolved \'%s\' to: %s",
- hostname,
- names_vector | ranges::views::join(',') | ranges::to<std::string>());
+ socket.send_to(asio::buffer(data.buffer), resolver_entry.endpoint(), udp::socket::message_flags{}, error);
+ if (error) {
+ core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " failed due to " << error.message();
+ continue;
+ }
+ core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " succeeded";
+ return {};
}
- return names;
+ return nonstd::make_unexpected(error);
+ };
+
+ const auto transfer_to_success = [&session, &flow_file]() -> void {
+ session->transfer(flow_file, Success);
};
- utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::UDP)
- | utils::map(utils::dereference)
- | utils::map(debug_log_resolved_names)
- | utils::flatMap([](const auto& names) { return utils::net::open_socket(names); })
- | utils::flatMap([&, this](utils::net::OpenSocketResult socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> {
- const auto& [socket_handle, selected_name] = socket_handle_and_selected_name;
- logger_->log_debug("connected to %s", nonthrowing_sockaddr_ntop(selected_name->ai_addr));
-#ifdef WIN32
- const char* const buffer_ptr = reinterpret_cast<const char*>(data.buffer.data());
-#else
- const void* const buffer_ptr = data.buffer.data();
-#endif
- const auto send_result = ::sendto(socket_handle.get(), buffer_ptr, data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen);
- logger_->log_trace("sendto returned %ld", static_cast<long>(send_result)); // NOLINT: sendto
- if (send_result == utils::net::SocketError) {
- return nonstd::make_unexpected(utils::net::get_last_socket_error());
- }
- session->transfer(flow_file, Success);
- return {};
- })
- | utils::orElse([&, this](std::error_code ec) {
- gsl_Expects(ec);
- logger_->log_error("%s", ec.message());
- session->transfer(flow_file, Failure);
- });
+ const auto transfer_to_failure = [&session, &flow_file, &logger = this->logger_](std::error_code ec) -> void {
+ gsl_Expects(ec);
+ logger->log_error("%s", ec.message());
+ session->transfer(flow_file, Failure);
+ };
+
+ resolve_hostname()
+ | utils::flatMap(send_data_to_endpoint)
+ | utils::map(transfer_to_success)
+ | utils::orElse(transfer_to_failure);
}
REGISTER_RESOURCE(PutUDP, Processor);
} // namespace org::apache::nifi::minifi::processors
-
diff --git a/extensions/standard-processors/processors/PutUDP.h b/extensions/standard-processors/processors/PutUDP.h
index 5217519fb..cf69fce3d 100644
--- a/extensions/standard-processors/processors/PutUDP.h
+++ b/extensions/standard-processors/processors/PutUDP.h
@@ -58,8 +58,6 @@ class PutUDP final : public core::Processor {
void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
private:
- std::string hostname_;
- std::string port_;
std::shared_ptr<core::logging::Logger> logger_;
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index c9b8e472e..302af6c3a 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -21,6 +21,7 @@
#include "SingleProcessorTestController.h"
#include "Utils.h"
#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog;
@@ -29,6 +30,7 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
constexpr uint64_t SYSLOG_PORT = 10255;
+constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
struct ValidRFC5424Message {
constexpr ValidRFC5424Message(std::string_view message,
@@ -197,21 +199,10 @@ constexpr std::string_view rfc5424_logger_example_1 = R"(<13>1 2022-03-17T10:10:
constexpr std::string_view invalid_syslog = "not syslog";
-void sendUDPPacket(const std::string_view content, uint64_t port) {
- asio::io_context io_context;
- asio::ip::udp::socket socket(io_context);
- asio::ip::udp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
- socket.open(asio::ip::udp::v4());
- std::error_code err;
- socket.send_to(asio::buffer(content, content.size()), remote_endpoint, 0, err);
- REQUIRE(!err);
- socket.close();
-}
-
void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, std::string_view protocol) {
CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
- CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+ CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
CHECK(std::nullopt == flow_file.getAttribute("syslog.valid"));
CHECK(std::nullopt == flow_file.getAttribute("syslog.priority"));
@@ -228,7 +219,7 @@ void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, s
void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424Message& original_message, uint16_t port, std::string_view protocol) {
CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
- CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+ CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
CHECK("true" == flow_file.getAttribute("syslog.valid"));
CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
@@ -247,7 +238,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424
void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164Message& original_message, uint16_t port, std::string_view protocol) {
CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
- CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+ CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender")));
CHECK("true" == flow_file.getAttribute("syslog.valid"));
CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
@@ -269,19 +260,37 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
std::string protocol;
SECTION("UDP") {
+ asio::ip::udp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
protocol = "UDP";
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
controller.plan->scheduleProcessor(listen_syslog);
- sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
- sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+ utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
+ utils::sendUdpDatagram(invalid_syslog, endpoint);
}
SECTION("TCP") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
protocol = "TCP";
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
controller.plan->scheduleProcessor(listen_syslog);
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT));
- REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT));
+ REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
+ REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
}
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
@@ -303,25 +312,43 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
std::string protocol;
SECTION("UDP") {
+ asio::ip::udp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
protocol = "UDP";
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
controller.plan->scheduleProcessor(listen_syslog);
std::this_thread::sleep_for(100ms);
- sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc5424_doc_example_2.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc5424_doc_example_3.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc5424_doc_example_4.unparsed_, SYSLOG_PORT);
-
- sendUDPPacket(rfc3164_doc_example_1.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc3164_doc_example_2.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc3164_doc_example_3.unparsed_, SYSLOG_PORT);
- sendUDPPacket(rfc3164_doc_example_4.unparsed_, SYSLOG_PORT);
-
- sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
- sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+ utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint);
+
+ utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint);
+ utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint);
+
+ utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
+ utils::sendUdpDatagram(invalid_syslog, endpoint);
}
SECTION("TCP") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
protocol = "TCP";
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
controller.plan->scheduleProcessor(listen_syslog);
@@ -329,15 +356,15 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
rfc5424_doc_example_2.unparsed_,
rfc5424_doc_example_3.unparsed_,
- rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT));
+ rfc5424_doc_example_4.unparsed_}, endpoint));
REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
rfc3164_doc_example_2.unparsed_,
rfc3164_doc_example_3.unparsed_,
- rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT));
+ rfc3164_doc_example_4.unparsed_}, endpoint));
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT));
- REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT));
+ REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
+ REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
}
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
@@ -410,19 +437,37 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
LogTestController::getInstance().setWarn<ListenSyslog>();
SECTION("UDP") {
+ asio::ip::udp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
controller.plan->scheduleProcessor(listen_syslog);
for (auto i = 0; i < 100; ++i) {
- sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
+ utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
}
SECTION("TCP") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
controller.plan->scheduleProcessor(listen_syslog);
for (auto i = 0; i < 100; ++i) {
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT));
+ REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint));
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
}
@@ -435,6 +480,15 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
}
TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ }
const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
SingleProcessorTestController controller{listen_syslog};
@@ -454,9 +508,8 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
ssl_context_service->enable();
controller.plan->scheduleProcessor(listen_syslog);
- REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
- REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
-
+ REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+ REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 559db0c2f..247f2e6ff 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -21,6 +21,7 @@
#include "SingleProcessorTestController.h"
#include "Utils.h"
#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
@@ -32,10 +33,20 @@ constexpr uint64_t PORT = 10254;
void check_for_attributes(core::FlowFile& flow_file) {
CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
- CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+ const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+ CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
}
TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
SingleProcessorTestController controller{listen_tcp};
@@ -44,8 +55,8 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
controller.plan->scheduleProcessor(listen_tcp);
- REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, PORT));
- REQUIRE(utils::sendMessagesViaTCP({"another_message"}, PORT));
+ REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint));
+ REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint));
ProcessorTriggerResult result;
REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
@@ -68,6 +79,15 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
}
TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
SingleProcessorTestController controller{listen_tcp};
@@ -79,7 +99,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
controller.plan->scheduleProcessor(listen_tcp);
for (auto i = 0; i < 100; ++i) {
- REQUIRE(utils::sendMessagesViaTCP({"test_message"}, PORT));
+ REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint));
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
@@ -110,27 +130,61 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
std::vector<std::string> expected_successful_messages;
+ asio::ip::tcp::endpoint endpoint;
+
SECTION("Without client certificate verification") {
SECTION("Client certificate not required, Client Auth set to NONE by default") {
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
}
SECTION("Client certificate not required, but validated if provided") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
}
ssl_context_service->enable();
controller.plan->scheduleProcessor(listen_tcp);
expected_successful_messages = {"test_message_1", "another_message"};
- for (const auto& message : expected_successful_messages) {
- REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+ for (const auto& message: expected_successful_messages) {
+ REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
}
}
SECTION("With client certificate provided") {
SECTION("Client certificate required") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
}
SECTION("Client certificate not required but validated") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
}
ssl_context_service->enable();
controller.plan->scheduleProcessor(listen_tcp);
@@ -143,16 +197,24 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
expected_successful_messages = {"test_message_1", "another_message"};
for (const auto& message : expected_successful_messages) {
- REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data));
+ REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data));
}
}
SECTION("Required certificate not provided") {
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ }
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
ssl_context_service->enable();
controller.plan->scheduleProcessor(listen_tcp);
- REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+ REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
}
ProcessorTriggerResult result;
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 9e8e95247..fd313d928 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -24,78 +24,59 @@
#include "Catch.h"
#include "PutUDP.h"
#include "core/ProcessContext.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
+#include "utils/net/UdpServer.h"
#include "utils/expected.h"
#include "utils/StringUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace org::apache::nifi::minifi::processors {
namespace {
-struct DatagramListener {
- DatagramListener(const char* const hostname, const char* const port)
- :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::UDP).value()},
- open_socket_{utils::net::open_socket(*resolved_names_)
- | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
- {
- const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
- if (bind_result == utils::net::SocketError) {
- throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
- }
- }
-
- struct ReceiveResult {
- std::string remote_address;
- std::string message;
- };
-
- [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
- ReceiveResult result;
- result.message.resize(max_message_size);
- sockaddr_storage remote_address{};
- socklen_t addrlen = sizeof(remote_address);
- const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
- if (recv_result == utils::net::SocketError) {
- throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())};
- }
- result.message.resize(gsl::narrow<size_t>(recv_result));
- result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
- return result;
+std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) {
+ auto start_time = std::chrono::system_clock::now();
+ utils::net::Message result;
+ while (start_time + timeout > std::chrono::system_clock::now()) {
+ if (listener.tryDequeue(result))
+ return result;
+ std::this_thread::sleep_for(interval);
}
-
- std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
- utils::net::OpenSocketResult open_socket_;
-};
+ return std::nullopt;
+}
} // namespace
-// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is
-// some weird IO error with the content repo.
TEST_CASE("PutUDP", "[putudp]") {
- const auto putudp = std::make_shared<PutUDP>("PutUDP");
+ const auto put_udp = std::make_shared<PutUDP>("PutUDP");
auto random_engine = std::mt19937{std::random_device{}()}; // NOLINT: "Missing space before { [whitespace/braces] [5]"
// most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
- const auto port_str = std::to_string(port);
- test::SingleProcessorTestController controller{putudp};
+ test::SingleProcessorTestController controller{put_udp};
LogTestController::getInstance().setTrace<PutUDP>();
LogTestController::getInstance().setTrace<core::ProcessContext>();
- LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
- putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
- putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+ put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+ put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
- DatagramListener listener{"localhost", port_str.c_str()};
+ utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+
+ auto server_thread = std::thread([&listener]() { listener.run(); });
+ auto cleanup_server = gsl::finally([&]{
+ listener.stop();
+ server_thread.join();
+ });
{
const char* const message = "first message: hello";
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
- REQUIRE(result.at(PutUDP::Failure).empty());
- REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
- auto receive_result = listener.receive();
- REQUIRE(receive_result.message == message);
- REQUIRE(!receive_result.remote_address.empty());
+ CHECK(result.at(PutUDP::Failure).empty());
+ CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+ auto received_message = tryDequeueWithTimeout(listener);
+ REQUIRE(received_message);
+ CHECK(received_message->message_data == message);
+ CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+ CHECK(!received_message->sender_address.to_string().empty());
}
{
@@ -103,12 +84,39 @@ TEST_CASE("PutUDP", "[putudp]") {
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
- REQUIRE(result.at(PutUDP::Failure).empty());
- REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
- auto receive_result = listener.receive();
- REQUIRE(receive_result.message == message);
- REQUIRE(!receive_result.remote_address.empty());
+ CHECK(result.at(PutUDP::Failure).empty());
+ CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+ auto received_message = tryDequeueWithTimeout(listener);
+ REQUIRE(received_message);
+ CHECK(received_message->message_data == message);
+ CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+ CHECK(!received_message->sender_address.to_string().empty());
}
-}
+ {
+ LogTestController::getInstance().clear();
+ auto message = std::string(65536, 'a');
+ const auto result = controller.trigger(message);
+ const auto& failure_flow_files = result.at(PutUDP::Failure);
+ REQUIRE(failure_flow_files.size() == 1);
+ CHECK(result.at(PutUDP::Success).empty());
+ CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+ CHECK((LogTestController::getInstance().contains("Message too long")
+ || LogTestController::getInstance().contains("A message sent on a datagram socket was larger than the internal message buffer")));
+ }
+
+ {
+ LogTestController::getInstance().clear();
+ const char* const message = "message for invalid host";
+ controller.plan->setProperty(put_udp, PutUDP::Hostname.getName(), "invalid_hostname");
+ const auto result = controller.trigger(message);
+ const auto& failure_flow_files = result.at(PutUDP::Failure);
+ auto received_message = tryDequeueWithTimeout(listener);
+ CHECK(!received_message);
+ REQUIRE(failure_flow_files.size() == 1);
+ CHECK(result.at(PutUDP::Success).empty());
+ CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+ CHECK((LogTestController::getInstance().contains("Host not found") || LogTestController::getInstance().contains("No such host is known")));
+ }
+}
} // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h
index f716a8758..173fdcb02 100644
--- a/libminifi/include/utils/net/SessionHandlingServer.h
+++ b/libminifi/include/utils/net/SessionHandlingServer.h
@@ -29,7 +29,7 @@ class SessionHandlingServer : public Server {
public:
SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
: Server(max_queue_size, std::move(logger)),
- acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) {
+ acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) {
}
void run() override {
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
index 490a081e7..6137f2ddf 100644
--- a/libminifi/src/utils/net/UdpServer.cpp
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -22,7 +22,7 @@ UdpServer::UdpServer(std::optional<size_t> max_queue_size,
uint16_t port,
std::shared_ptr<core::logging::Logger> logger)
: Server(max_queue_size, std::move(logger)),
- socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) {
+ socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
doReceive();
}
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 96389f740..76d8cc533 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -111,24 +111,47 @@ bool countLogOccurrencesUntil(const std::string& pattern,
return false;
}
-bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) {
+bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
asio::io_context io_context;
asio::ip::tcp::socket socket(io_context);
- asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
socket.connect(remote_endpoint);
std::error_code err;
for (auto& content : contents) {
std::string tcp_message(content);
tcp_message += '\n';
asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
+ if (err) {
+ return false;
+ }
}
- if (err) {
- return false;
- }
- socket.close();
return true;
}
+bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
+ asio::io_context io_context;
+ asio::ip::udp::socket socket(io_context);
+ socket.open(remote_endpoint.protocol());
+ std::error_code err;
+ socket.send_to(content, remote_endpoint, 0, err);
+ return !err;
+}
+
+bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
+ return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint);
+}
+
+bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
+ return sendUdpDatagram(asio::buffer(content), remote_endpoint);
+}
+
+bool isIPv6Disabled() {
+ asio::io_context io_context;
+ std::error_code error_code;
+ asio::ip::tcp::socket socket_tcp(io_context);
+ socket_tcp.connect(asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), 10), error_code);
+ return error_code.value() == EADDRNOTAVAIL;
+}
+
struct ConnectionTestAccessor {
FIELD_ACCESSOR(queue_);
};
@@ -143,7 +166,10 @@ struct FlowFileQueueTestAccessor {
FIELD_ACCESSOR(queue_);
};
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t port, const std::string& ca_cert_path, const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
+bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+ const asio::ip::tcp::endpoint& remote_endpoint,
+ const std::string& ca_cert_path,
+ const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
asio::ssl::context ctx(asio::ssl::context::sslv23);
ctx.load_verify_file(ca_cert_path);
if (ssl_data) {
@@ -154,7 +180,6 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t
}
asio::io_context io_context;
asio::ssl::stream<asio::ip::tcp::socket> socket(io_context, ctx);
- asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
asio::error_code err;
socket.lowest_layer().connect(remote_endpoint, err);
if (err) {
@@ -168,11 +193,10 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t
std::string tcp_message(content);
tcp_message += '\n';
asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
+ if (err) {
+ return false;
+ }
}
- if (err) {
- return false;
- }
- socket.lowest_layer().close();
return true;
}