You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/09 11:41:05 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1412: MINIFICPP-1923 Refactor PutUDP to use asio

fgerlits commented on code in PR #1412:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1412#discussion_r964775230


##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -24,91 +24,87 @@
 #include "Catch.h"
 #include "PutUDP.h"
 #include "core/ProcessContext.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
+#include "utils/net/UdpServer.h"
 #include "utils/expected.h"
 #include "utils/StringUtils.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace org::apache::nifi::minifi::processors {
 
 namespace {
-struct DatagramListener {
-  DatagramListener(const char* const hostname, const char* const port)
-    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::UDP).value()},
-     open_socket_{utils::net::open_socket(*resolved_names_)
-        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
-  {
-    const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
-    if (bind_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
-    }
+std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) {
+  auto start_time = std::chrono::system_clock::now();
+  utils::net::Message result;
+  while (start_time + timeout > std::chrono::system_clock::now()) {
+    if (listener.tryDequeue(result))
+      return result;
+    std::this_thread::sleep_for(interval);
   }
-
-  struct ReceiveResult {
-    std::string remote_address;
-    std::string message;
-  };
-
-  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
-    ReceiveResult result;
-    result.message.resize(max_message_size);
-    sockaddr_storage remote_address{};
-    socklen_t addrlen = sizeof(remote_address);
-    const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
-    if (recv_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())};
-    }
-    result.message.resize(gsl::narrow<size_t>(recv_result));
-    result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
-    return result;
-  }
-
-  std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
-  utils::net::OpenSocketResult open_socket_;
-};
+  return std::nullopt;
+}
 }  // namespace
 
-// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is
-// some weird IO error with the content repo.
 TEST_CASE("PutUDP", "[putudp]") {
-  const auto putudp = std::make_shared<PutUDP>("PutUDP");
+  const auto put_udp = std::make_shared<PutUDP>("PutUDP");
   auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
   // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
   const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
-  const auto port_str = std::to_string(port);
 
-  test::SingleProcessorTestController controller{putudp};
+  test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
-  putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+  put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  DatagramListener listener{"localhost", port_str.c_str()};
+  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+
+  auto server_thread = std::thread([&listener]() { listener.run(); });
+  auto cleanup_server = gsl::finally([&]{
+    listener.stop();
+    server_thread.join();
+  });
 
   {
     const char* const message = "first message: hello";
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
 
   {
     const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";  // NOLINT
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
-}
 
+  {
+    const char* const message = "message for invalid host";
+    controller.plan->setProperty(put_udp, PutUDP::Hostname.getName(), "invalid_hostname");
+    const auto result = controller.trigger(message);
+    const auto& failure_flow_files = result.at(PutUDP::Failure);
+    auto received_message = tryDequeueWithTimeout(listener);
+    CHECK(!received_message);
+    REQUIRE(failure_flow_files.size() == 1);
+    CHECK(result.at(PutUDP::Success).empty());
+    CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+    CHECK((LogTestController::getInstance().contains("Host not found") || LogTestController::getInstance().contains("No such host is known")));

Review Comment:
   please call `LogTestController::getInstance().clear()` at the start of this section to make sure the message was logged in this section and not earlier



##########
extensions/standard-processors/processors/PutUDP.cpp:
##########
@@ -107,48 +98,48 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return;
   }
 
-  const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> std::string {
-    return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); }).value_or("(n/a)");
+  asio::io_context io_context;
+
+  const auto resolve_hostname = [&io_context, &hostname, &port]() -> nonstd::expected<udp::resolver::results_type, std::error_code> {
+    udp::resolver resolver(io_context);
+    std::error_code error_code;
+    auto resolved_query = resolver.resolve(udp::v4(), hostname, port, error_code);
+    if (error_code)
+      return nonstd::make_unexpected(error_code);
+    return resolved_query;
+  };
+
+  const auto debug_log_resolved_endpoint = [&hostname, &logger = this->logger_](const udp::resolver::results_type& resolved_query) -> udp::endpoint {
+    if (logger->should_log(core::logging::LOG_LEVEL::debug))
+      core::logging::LOG_WARN(logger) << "resolved " << hostname << " to: " << resolved_query->endpoint();
+    return resolved_query->endpoint();
   };
 
-  const auto debug_log_resolved_names = [&, this](const addrinfo& names) -> decltype(auto) {
-    if (logger_->should_log(core::logging::LOG_LEVEL::debug)) {
-      std::vector<std::string> names_vector;
-      for (const addrinfo* it = &names; it; it = it->ai_next) {
-        names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr));
-      }
-      logger_->log_debug("resolved \'%s\' to: %s",
-          hostname,
-          names_vector | ranges::views::join(',') | ranges::to<std::string>());
-    }
-    return names;
+  const auto send_data_to_endpoint = [&io_context, &data](const udp::endpoint& endpoint) -> nonstd::expected<void, std::error_code> {
+    std::error_code send_error;
+    udp::socket socket(io_context);
+    socket.open(udp::v4());
+    socket.send_to(asio::buffer(data.buffer), endpoint, udp::socket::message_flags{}, send_error);
+    if (send_error)
+      return nonstd::make_unexpected(send_error);
+    return {};
+  };
+
+  const auto transfer_to_success = [&session, &flow_file]() -> void {
+    session->transfer(flow_file, Success);
+  };
+
+  const auto transfer_to_failure = [&session, &flow_file, &logger = this->logger_](std::error_code ec) -> void {
+    gsl_Expects(ec);
+    logger->log_error("%s", ec.message());
+    session->transfer(flow_file, Failure);
   };
 
-  utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::UDP)
-      | utils::map(utils::dereference)
-      | utils::map(debug_log_resolved_names)
-      | utils::flatMap([](const auto& names) { return utils::net::open_socket(names); })
-      | utils::flatMap([&, this](utils::net::OpenSocketResult socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> {
-        const auto& [socket_handle, selected_name] = socket_handle_and_selected_name;
-        logger_->log_debug("connected to %s", nonthrowing_sockaddr_ntop(selected_name->ai_addr));
-#ifdef WIN32
-        const char* const buffer_ptr = reinterpret_cast<const char*>(data.buffer.data());
-#else
-        const void* const buffer_ptr = data.buffer.data();
-#endif
-        const auto send_result = ::sendto(socket_handle.get(), buffer_ptr, data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen);
-        logger_->log_trace("sendto returned %ld", static_cast<long>(send_result));  // NOLINT: sendto
-        if (send_result == utils::net::SocketError) {
-          return nonstd::make_unexpected(utils::net::get_last_socket_error());
-        }
-        session->transfer(flow_file, Success);
-        return {};
-      })
-      | utils::orElse([&, this](std::error_code ec) {
-        gsl_Expects(ec);
-        logger_->log_error("%s", ec.message());
-        session->transfer(flow_file, Failure);
-      });
+  resolve_hostname()
+      | utils::map(debug_log_resolved_endpoint)
+      | utils::flatMap(send_data_to_endpoint)
+      | utils::map(transfer_to_success)
+      | utils::orElse(transfer_to_failure);

Review Comment:
   👍 it's much more readable this way



##########
extensions/standard-processors/processors/PutUDP.cpp:
##########
@@ -107,48 +98,48 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return;
   }
 
-  const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> std::string {
-    return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); }).value_or("(n/a)");
+  asio::io_context io_context;
+
+  const auto resolve_hostname = [&io_context, &hostname, &port]() -> nonstd::expected<udp::resolver::results_type, std::error_code> {
+    udp::resolver resolver(io_context);
+    std::error_code error_code;
+    auto resolved_query = resolver.resolve(udp::v4(), hostname, port, error_code);
+    if (error_code)
+      return nonstd::make_unexpected(error_code);
+    return resolved_query;
+  };
+
+  const auto debug_log_resolved_endpoint = [&hostname, &logger = this->logger_](const udp::resolver::results_type& resolved_query) -> udp::endpoint {
+    if (logger->should_log(core::logging::LOG_LEVEL::debug))
+      core::logging::LOG_WARN(logger) << "resolved " << hostname << " to: " << resolved_query->endpoint();

Review Comment:
   should this be `LOG_DEBUG`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org