You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/08 19:33:56 UTC
[nifi-minifi-cpp] 01/04: MINIFICPP-1979 Use Coroutines with asio
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f2f561ce2b4635e721b1d5b08b34bf8d450c9f6f
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Feb 8 16:26:01 2023 +0100
MINIFICPP-1979 Use Coroutines with asio
MINIFICPP-1985 Run asio related tests on random ports
Closes #1457
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.github/workflows/ci.yml | 6 +-
CMakeLists.txt | 3 +
cmake/BuildTests.cmake | 1 +
.../CMakeLists.txt => cmake/Coroutines.cmake | 21 +-
extensions/standard-processors/CMakeLists.txt | 3 +
.../processors/ListenSyslog.cpp | 5 +-
.../standard-processors/processors/ListenTCP.cpp | 5 +-
.../processors/NetworkListenerProcessor.cpp | 8 +-
.../processors/NetworkListenerProcessor.h | 7 +-
.../standard-processors/processors/PutTCP.cpp | 484 +++++++--------------
extensions/standard-processors/processors/PutTCP.h | 32 +-
.../standard-processors/processors/PutUDP.cpp | 2 +-
.../standard-processors/tests/CMakeLists.txt | 2 +
.../tests/unit/ListenSyslogTests.cpp | 204 +++++----
.../tests/unit/ListenTcpTests.cpp | 187 +++++---
.../tests/unit/ListenUDPTests.cpp | 53 +--
.../standard-processors/tests/unit/PutTCPTests.cpp | 162 +++----
.../standard-processors/tests/unit/PutUDPTests.cpp | 13 +-
libminifi/include/controllers/SSLContextService.h | 13 +-
libminifi/include/utils/net/AsioCoro.h | 75 ++++
libminifi/include/utils/net/Server.h | 15 +-
.../include/utils/net/SessionHandlingServer.h | 67 ---
libminifi/include/utils/net/Ssl.h | 16 +
libminifi/include/utils/net/SslServer.h | 65 ---
libminifi/include/utils/net/TcpServer.h | 38 +-
libminifi/include/utils/net/UdpServer.h | 12 +-
libminifi/src/controllers/SSLContextService.cpp | 10 +-
libminifi/src/utils/net/SslServer.cpp | 90 ----
libminifi/src/utils/net/TcpServer.cpp | 90 ++--
libminifi/src/utils/net/UdpServer.cpp | 41 +-
libminifi/test/Catch.h | 46 +-
libminifi/test/Utils.h | 75 ++--
32 files changed, 861 insertions(+), 990 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c3017324e..9aa080554 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,7 +3,7 @@ on: [push, pull_request, workflow_dispatch]
jobs:
macos_xcode:
name: "macos-xcode"
- runs-on: macos-11
+ runs-on: macos-12
timeout-minutes: 180
env:
CCACHE_BASEDIR: ${{ GITHUB.WORKSPACE }}
@@ -29,8 +29,8 @@ jobs:
run: |
echo "PATH=/usr/lib/ccache:/usr/local/opt/ccache/bin:/usr/local/opt/ccache/libexec:$PATH" >> $GITHUB_ENV
echo -e "127.0.0.1\t$HOSTNAME" | sudo tee -a /etc/hosts > /dev/null
- # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-11-Readme.md#xcode
- sudo xcode-select -switch /Applications/Xcode_13.2.1.app
+ # https://github.com/actions/virtual-environments/blob/main/images/macos/macos-12-Readme.md#xcode
+ sudo xcode-select -switch /Applications/Xcode_14.0.1.app
- name: build
run: |
export PATH="/usr/local/opt/flex/bin:$PATH"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 35cf83db7..513afe7ae 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -286,6 +286,9 @@ target_include_directories(RapidJSON SYSTEM INTERFACE "${CMAKE_CURRENT_SOURCE_DI
# cxxopts
include(CxxOpts)
+include(Coroutines)
+enable_coroutines()
+
# gsl-lite
include(GslLite)
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index bf21b589f..238b1f87b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -91,6 +91,7 @@ endif()
SET(CATCH_MAIN_LIB catch_main)
add_library(${CATCH_MAIN_LIB} STATIC "${TEST_DIR}/CatchMain.cpp")
target_include_directories(${CATCH_MAIN_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+target_link_libraries(${CATCH_MAIN_LIB} spdlog) # for fmt
SET(TEST_RESOURCES ${TEST_DIR}/resources)
diff --git a/extensions/standard-processors/CMakeLists.txt b/cmake/Coroutines.cmake
similarity index 59%
copy from extensions/standard-processors/CMakeLists.txt
copy to cmake/Coroutines.cmake
index db679c286..e027551e5 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/cmake/Coroutines.cmake
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,19 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-
-file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp" )
-
-add_library(minifi-standard-processors SHARED ${SOURCES})
-
-include(RangeV3)
-include(Asio)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
-
-register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
-register_extension_linter(minifi-standard-processors-linter)
+function(enable_coroutines)
+ if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines" PARENT_SCOPE)
+ endif()
+endfunction(enable_coroutines)
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index db679c286..d9b46400a 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -28,6 +28,9 @@ include(RangeV3)
include(Asio)
target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
+include(Coroutines)
+enable_coroutines()
+
register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/")
register_extension_linter(minifi-standard-processors-linter)
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 41518a1d1..ecd6b7477 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -22,6 +22,7 @@
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
#include "controllers/SSLContextService.h"
+#include "utils/net/Ssl.h"
namespace org::apache::nifi::minifi::processors {
@@ -67,8 +68,8 @@ const core::Property ListenSyslog::SSLContextService(
const core::Property ListenSyslog::ClientAuth(
core::PropertyBuilder::createProperty("Client Auth")
->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
- ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
- ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+ ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+ ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
->build());
const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. "
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
index 94c2d9884..2b600677e 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -20,6 +20,7 @@
#include "core/PropertyBuilder.h"
#include "controllers/SSLContextService.h"
#include "utils/ProcessorConfigUtils.h"
+#include "utils/net/Ssl.h"
namespace org::apache::nifi::minifi::processors {
@@ -54,8 +55,8 @@ const core::Property ListenTCP::SSLContextService(
const core::Property ListenTCP::ClientAuth(
core::PropertyBuilder::createProperty("Client Auth")
->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
- ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
- ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
+ ->withDefaultValue<std::string>(toString(utils::net::ClientAuthOption::NONE))
+ ->withAllowableValues<std::string>(utils::net::ClientAuthOption::values())
->build());
const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index 21b74f1ef..ebe884c1c 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex
auto options = readServerOptions(context);
std::string ssl_value;
+ std::optional<utils::net::SslServerOptions> ssl_options;
if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
if (!ssl_data || !ssl_data->isValid()) {
throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
}
- auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
- server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
- } else {
- server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
+ auto client_auth = utils::parseEnumProperty<utils::net::ClientAuthOption>(context, client_auth_property);
+ ssl_options.emplace(std::move(*ssl_data), client_auth);
}
+ server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_, ssl_options);
startServer(options, utils::net::IpProtocol::TCP);
}
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index 1799a3fcb..447a59322 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -26,7 +26,6 @@
#include "core/ProcessSession.h"
#include "core/Property.h"
#include "utils/net/Server.h"
-#include "utils/net/SslServer.h"
namespace org::apache::nifi::minifi::processors {
@@ -51,6 +50,12 @@ class NetworkListenerProcessor : public core::Processor {
stopServer();
}
+ uint16_t getPort() {
+ if (server_)
+ return server_->getPort();
+ return 0;
+ }
+
protected:
void startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property);
void startUdpServer(const core::ProcessContext& context);
diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp
index 0b992ebf6..c5c2d7717 100644
--- a/extensions/standard-processors/processors/PutTCP.cpp
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -16,30 +16,29 @@
*/
#include "PutTCP.h"
-#include <algorithm>
#include <utility>
+#include <tuple>
#include "range/v3/range/conversion.hpp"
#include "utils/gsl.h"
-#include "utils/expected.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
#include "core/logging/Logger.h"
-#include "controllers/SSLContextService.h"
-#include "asio/ssl.hpp"
-#include "asio/ip/tcp.hpp"
-#include "asio/write.hpp"
-#include "asio/high_resolution_timer.hpp"
+#include "utils/net/AsioCoro.h"
using asio::ip::tcp;
-using TcpSocket = asio::ip::tcp::socket;
-using SslSocket = asio::ssl::stream<tcp::socket>;
using namespace std::literals::chrono_literals;
+using std::chrono::steady_clock;
+using org::apache::nifi::minifi::utils::net::use_nothrow_awaitable;
+using org::apache::nifi::minifi::utils::net::HandshakeType;
+using org::apache::nifi::minifi::utils::net::TcpSocket;
+using org::apache::nifi::minifi::utils::net::SslSocket;
+using org::apache::nifi::minifi::utils::net::asyncOperationWithTimeout;
namespace org::apache::nifi::minifi::processors {
@@ -114,6 +113,21 @@ void PutTCP::initialize() {
void PutTCP::notifyStop() {}
+namespace {
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
+ asio::ssl::context ssl_context(asio::ssl::context::tls_client);
+ ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+ ssl_context.load_verify_file(ssl_context_service.getCACertificate().string());
+ ssl_context.set_verify_mode(asio::ssl::verify_peer);
+ if (const auto& cert_file = ssl_context_service.getCertificateFile(); !cert_file.empty())
+ ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
+ if (const auto& private_key_file = ssl_context_service.getPrivateKeyFile(); !private_key_file.empty())
+ ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
+ ssl_context.set_password_callback([password = ssl_context_service.getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
+ return ssl_context;
+}
+} // namespace
+
void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
gsl_Expects(context);
@@ -130,29 +144,31 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
idle_connection_expiration_.reset();
if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
- timeout_ = timeout->getMilliseconds();
+ timeout_duration_ = timeout->getMilliseconds();
else
- timeout_ = 15s;
+ timeout_duration_ = 15s;
+
+ if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+ connections_.reset();
+ else
+ connections_.emplace();
std::string context_name;
- ssl_context_service_.reset();
+ ssl_context_.reset();
if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
if (auto controller_service = context->getControllerService(context_name)) {
- ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
- if (!ssl_context_service_)
- logger_->log_error("%s is not a SSL Context Service", context_name);
+ if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+ ssl_context_ = getSslContext(*ssl_context_service);
+ } else {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service");
+ }
} else {
- logger_->log_error("Invalid controller service: %s", context_name);
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name);
}
}
delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
- if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
- connections_.reset();
- else
- connections_.emplace();
-
if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
else
@@ -160,6 +176,16 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
}
namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+ co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+ co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration); // NOLINT
+}
+
template<class SocketType>
class ConnectionHandler : public ConnectionHandlerBase {
public:
@@ -167,332 +193,131 @@ class ConnectionHandler : public ConnectionHandlerBase {
std::chrono::milliseconds timeout,
std::shared_ptr<core::logging::Logger> logger,
std::optional<size_t> max_size_of_socket_send_buffer,
- std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+ asio::ssl::context* ssl_context)
: connection_id_(std::move(connection_id)),
- timeout_(timeout),
+ timeout_duration_(timeout),
logger_(std::move(logger)),
max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
- ssl_context_service_(std::move(ssl_context_service)) {
+ ssl_context_(ssl_context) {
}
~ConnectionHandler() override = default;
- nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+ asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+ const std::vector<std::byte>& delimiter,
+ asio::io_context& io_context_) override;
private:
- nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
[[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
- return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+ return last_used_ && *last_used_ >= (steady_clock::now() - dur);
}
void reset() override {
last_used_.reset();
socket_.reset();
- io_context_.reset();
- last_error_.clear();
- deadline_.expires_at(asio::steady_timer::time_point::max());
}
- void checkDeadline(std::error_code error_code, SocketType* socket);
- void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
- void handleConnect(std::error_code error,
- tcp::resolver::results_type::iterator endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
- void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
- void handleHandshake(std::error_code error,
- const tcp::resolver::results_type::iterator& endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
-
- void handleWrite(std::error_code error,
- std::size_t bytes_written,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- const std::vector<std::byte>& delimiter,
- const std::shared_ptr<SocketType>& socket);
-
- void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+ [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+ [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+ [[nodiscard]] bool hasUsableSocket() const { return socket_ && socket_->lowest_layer().is_open(); }
- nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+ asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+ asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
- [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+ SocketType createNewSocket(asio::io_context& io_context_);
detail::ConnectionId connection_id_;
- std::optional<std::chrono::steady_clock::time_point> last_used_;
- asio::io_context io_context_;
- std::error_code last_error_;
- asio::steady_timer deadline_{io_context_};
- std::chrono::milliseconds timeout_;
- std::shared_ptr<SocketType> socket_;
+ std::optional<SocketType> socket_;
+
+ std::optional<steady_clock::time_point> last_used_;
+ std::chrono::milliseconds timeout_duration_;
std::shared_ptr<core::logging::Logger> logger_;
std::optional<size_t> max_size_of_socket_send_buffer_;
- std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
- nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
- nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- const std::vector<std::byte>& delimiter);
+ asio::ssl::context* ssl_context_;
};
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
- return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
- if (socket_ && socket_->lowest_layer().is_open())
- return socket_;
- auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
- if (!new_socket)
- return nonstd::make_unexpected(new_socket.error());
- socket_ = std::move(*new_socket);
- return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
- if (error_code != asio::error::operation_aborted) {
- deadline_.expires_at(asio::steady_timer::time_point::max());
- last_error_ = asio::error::timed_out;
- deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
- socket->lowest_layer().close();
- }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
- if (endpoint_iter == tcp::resolver::results_type::iterator()) {
- logger_->log_trace("No more endpoints to try");
- deadline_.cancel();
- return;
- }
-
- last_error_.clear();
- deadline_.expires_after(timeout_);
- deadline_.async_wait([&](std::error_code error_code) -> void {
- checkDeadline(error_code, socket.get());
- });
- socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
- [&socket, endpoint_iter, this](std::error_code err) {
- handleConnect(err, endpoint_iter, socket);
- });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
- tcp::resolver::results_type::iterator endpoint_iter,
- const std::shared_ptr<SocketType>& socket) {
- bool connection_failed_before_deadline = error.operator bool();
- bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
- if (connection_failed_due_to_deadline) {
- core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
- socket->lowest_layer().close();
- return startConnect(++endpoint_iter, socket);
- }
-
- if (connection_failed_before_deadline) {
- core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
- last_error_ = error;
- socket->lowest_layer().close();
- return startConnect(++endpoint_iter, socket);
- }
-
- if (max_size_of_socket_send_buffer_)
- socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
- handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
- const tcp::resolver::results_type::iterator&,
- const std::shared_ptr<SocketType>&) {
- throw std::invalid_argument("Handshake called without SSL");
-}
-
template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
- const tcp::resolver::results_type::iterator& endpoint_iter,
- const std::shared_ptr<SslSocket>& socket) {
- if (!error) {
- core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
- deadline_.cancel();
- return;
- }
- core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
- last_error_ = error;
- socket->lowest_layer().close();
- startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+ gsl_Expects(!ssl_context_);
+ return TcpSocket{io_context_};
}
template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
- const std::shared_ptr<TcpSocket>& socket) {
- core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
- socket->lowest_layer().non_blocking(true);
- deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
- const std::shared_ptr<SslSocket>& socket) {
- core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
- socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
- handleHandshake(handshake_error, endpoint_iter, socket);
- });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+ gsl_Expects(ssl_context_);
+ return {io_context_, *ssl_context_};
}
template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
- std::size_t bytes_written,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- const std::vector<std::byte>& delimiter,
- const std::shared_ptr<SocketType>& socket) {
- bool write_failed_before_deadline = error.operator bool();
- bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
- if (write_failed_due_to_deadline) {
- logger_->log_trace("Writing flowfile to socket timed out");
- socket->lowest_layer().close();
- deadline_.cancel();
- return;
- }
-
- if (write_failed_before_deadline) {
- last_error_ = error;
- logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
- socket->lowest_layer().close();
- deadline_.cancel();
- return;
- }
-
- logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
- if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
- asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
- handleDelimiterWrite(error, bytes_written, socket);
- });
- } else {
- std::vector<std::byte> data_chunk;
- data_chunk.resize(chunk_size);
- gsl::span<std::byte> buffer{data_chunk};
- size_t num_read = flow_file_content_stream->read(buffer);
- asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
- handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
- });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) {
+ auto socket = createNewSocket(io_context);
+ std::error_code last_error;
+ for (const auto& endpoint : endpoints) {
+ auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+ if (connection_error) {
+ core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+ last_error = connection_error;
+ continue;
+ }
+ auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+ if (handshake_error) {
+ core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+ last_error = handshake_error;
+ continue;
+ }
+ if (max_size_of_socket_send_buffer_)
+ socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+ socket_.emplace(std::move(socket));
+ co_return std::error_code();
}
+ co_return last_error;
}
template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
- bool write_failed_before_deadline = error.operator bool();
- bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
- if (write_failed_due_to_deadline) {
- logger_->log_trace("Writing delimiter to socket timed out");
- socket->lowest_layer().close();
- deadline_.cancel();
- return;
- }
-
- if (write_failed_before_deadline) {
- last_error_ = error;
- logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
- socket->lowest_layer().close();
- deadline_.cancel();
- return;
- }
-
- logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
- deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
- auto socket = std::make_shared<TcpSocket>(io_context_);
- startConnect(resolved_query.begin(), socket);
- deadline_.expires_after(timeout_);
- deadline_.async_wait([&](std::error_code error_code) -> void {
- checkDeadline(error_code, socket.get());
- });
- io_context_.run();
- if (last_error_)
- return nonstd::make_unexpected(last_error_);
- return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
- gsl_Expects(ssl_context_service);
- asio::ssl::context ssl_context(asio::ssl::context::sslv23);
- ssl_context.load_verify_file(ssl_context_service->getCACertificate().string());
- ssl_context.set_verify_mode(asio::ssl::verify_peer);
- if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
- ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
- if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
- ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
- ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
- return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+ if (hasUsableSocket())
+ co_return std::error_code();
+ tcp::resolver resolver(io_context);
+ auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+ if (resolve_error)
+ co_return resolve_error;
+ co_return co_await establishNewConnection(resolve_result, io_context);
}
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
- auto ssl_context = getSslContext(ssl_context_service_);
- auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
- startConnect(resolved_query.begin(), socket);
- deadline_.async_wait([&](std::error_code error_code) -> void {
- checkDeadline(error_code, socket.get());
- });
- io_context_.run();
- if (last_error_)
- return nonstd::make_unexpected(last_error_);
- return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+ const std::vector<std::byte>& delimiter,
+ asio::io_context& io_context) {
+ if (auto connection_error = co_await setupUsableSocket(io_context)) // NOLINT
+ co_return connection_error;
+ co_return co_await send(stream_to_send, delimiter);
}
template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- const std::vector<std::byte>& delimiter) {
- if (!socket || !socket->lowest_layer().is_open())
- return nonstd::make_unexpected(asio::error::not_socket);
-
- deadline_.expires_after(timeout_);
- deadline_.async_wait([&](std::error_code error_code) -> void {
- checkDeadline(error_code, socket.get());
- });
- io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) {
+ gsl_Expects(hasUsableSocket());
std::vector<std::byte> data_chunk;
data_chunk.resize(chunk_size);
-
gsl::span<std::byte> buffer{data_chunk};
- size_t num_read = flow_file_content_stream->read(buffer);
- logger_->log_trace("read %zu bytes from flowfile", num_read);
- asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
- handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
- });
- deadline_.async_wait([&](std::error_code error_code) -> void {
- checkDeadline(error_code, socket.get());
- });
- io_context_.run();
- if (last_error_)
- return nonstd::make_unexpected(last_error_);
- last_used_ = std::chrono::steady_clock::now();
- return {};
-}
+ while (stream_to_send->tell() < stream_to_send->size()) {
+ size_t num_read = stream_to_send->read(buffer);
+ if (io::isError(num_read))
+ co_return std::make_error_code(std::errc::io_error);
+ auto [write_error, bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(data_chunk, num_read), use_nothrow_awaitable), timeout_duration_);
+ if (write_error)
+ co_return write_error;
+ logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
+ }
+ auto [delimiter_write_error, delimiter_bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(delimiter), use_nothrow_awaitable), timeout_duration_);
+ if (delimiter_write_error)
+ co_return delimiter_write_error;
+ logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", delimiter_bytes_written);
-template<class SocketType>
-nonstd::expected<tcp::resolver::results_type, std::error_code> ConnectionHandler<SocketType>::resolveHostname() {
- tcp::resolver resolver(io_context_);
- std::error_code error_code;
- auto resolved_query = resolver.resolve(connection_id_.getHostname(), connection_id_.getPort(), error_code);
- if (error_code)
- return nonstd::make_unexpected(error_code);
- return resolved_query;
+ last_used_ = steady_clock::now();
+ co_return std::error_code();
}
} // namespace
@@ -517,19 +342,13 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
return;
}
- auto flow_file_content_stream = session->getFlowFileContentStream(flow_file);
- if (!flow_file_content_stream) {
- session->transfer(flow_file, Failure);
- return;
- }
-
auto connection_id = detail::ConnectionId(std::move(hostname), std::move(port));
std::shared_ptr<ConnectionHandlerBase> handler;
if (!connections_ || !connections_->contains(connection_id)) {
- if (ssl_context_service_)
- handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, ssl_context_service_);
+ if (ssl_context_)
+ handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
else
- handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, nullptr);
+ handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
if (connections_)
(*connections_)[connection_id] = handler;
} else {
@@ -538,7 +357,7 @@ void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
gsl_Expects(handler);
- processFlowFile(handler, flow_file_content_stream, *session, flow_file);
+ processFlowFile(handler, *session, flow_file);
}
void PutTCP::removeExpiredConnections() {
@@ -550,30 +369,43 @@ void PutTCP::removeExpiredConnections() {
}
}
+std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+ const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
+ std::error_code operation_error;
+ io_context_.restart();
+ asio::co_spawn(io_context_,
+ connection_handler->sendStreamWithDelimiter(flow_file_content_stream, delimiter_, io_context_),
+ [&operation_error](const std::exception_ptr&, std::error_code error_code) {
+ operation_error = error_code;
+ });
+ io_context_.run();
+ return operation_error;
+}
+
void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- core::ProcessSession& session,
- const std::shared_ptr<core::FlowFile>& flow_file) {
- auto result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+ core::ProcessSession& session,
+ const std::shared_ptr<core::FlowFile>& flow_file) {
+ auto flow_file_content_stream = session.getFlowFileContentStream(flow_file);
+ if (!flow_file_content_stream) {
+ session.transfer(flow_file, Failure);
+ return;
+ }
- if (!result && connection_handler->hasBeenUsed()) {
- logger_->log_warn("%s with reused connection, retrying...", result.error().message());
+ std::error_code operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
+
+ if (operation_error && connection_handler->hasBeenUsed()) {
+ logger_->log_warn("%s with reused connection, retrying...", operation_error.message());
connection_handler->reset();
- result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+ operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
}
- const auto transfer_to_success = [&session, &flow_file]() -> void {
- session.transfer(flow_file, Success);
- };
-
- const auto transfer_to_failure = [&session, &flow_file, &logger = logger_, &connection_handler](std::error_code ec) -> void {
- gsl_Expects(ec);
+ if (operation_error) {
connection_handler->reset();
- logger->log_error("%s", ec.message());
+ logger_->log_error("%s", operation_error.message());
session.transfer(flow_file, Failure);
- };
-
- result | utils::map(transfer_to_success) | utils::orElse(transfer_to_failure);
+ } else {
+ session.transfer(flow_file, Success);
+ }
}
REGISTER_RESOURCE(PutTCP, Processor);
diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h
index 1f6f7fb58..58ae28f94 100644
--- a/extensions/standard-processors/processors/PutTCP.h
+++ b/extensions/standard-processors/processors/PutTCP.h
@@ -32,6 +32,10 @@
#include "utils/expected.h"
#include "utils/StringUtils.h" // for string <=> on libc++
+#include <asio/io_context.hpp>
+#include <asio/awaitable.hpp>
+#include <asio/ssl/context.hpp>
+
namespace org::apache::nifi::minifi::processors::detail {
class ConnectionId {
@@ -50,12 +54,12 @@ class ConnectionId {
} // namespace org::apache::nifi::minifi::processors::detail
namespace std {
-template <>
+template<>
struct hash<org::apache::nifi::minifi::processors::detail::ConnectionId> {
size_t operator()(const org::apache::nifi::minifi::processors::detail::ConnectionId& connection_id) const {
return org::apache::nifi::minifi::utils::hash_combine(
std::hash<std::string_view>{}(connection_id.getHostname()),
- std::hash <std::string_view>{}(connection_id.getPort()));
+ std::hash<std::string_view>{}(connection_id.getPort()));
}
};
} // namespace std
@@ -64,16 +68,19 @@ namespace org::apache::nifi::minifi::processors {
class ConnectionHandlerBase {
public:
virtual ~ConnectionHandlerBase() = default;
+ virtual void reset() = 0;
[[nodiscard]] virtual bool hasBeenUsed() const = 0;
[[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) const = 0;
- virtual nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) = 0;
- virtual void reset() = 0;
+ [[nodiscard]] virtual asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+ const std::vector<std::byte>& delimiter,
+ asio::io_context& io_context) = 0;
};
class PutTCP final : public core::Processor {
public:
- EXTENSIONAPI static constexpr const char* Description = "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+ EXTENSIONAPI static constexpr const char* Description =
+ "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
"By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, "
"an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. "
"An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection "
@@ -107,22 +114,25 @@ class PutTCP final : public core::Processor {
void initialize() final;
void notifyStop() final;
- void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+ void onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) final;
void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
private:
void removeExpiredConnections();
void processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
- const std::shared_ptr<io::InputStream>& flow_file_content_stream,
- core::ProcessSession& session,
- const std::shared_ptr<core::FlowFile>& flow_file);
+ core::ProcessSession& session,
+ const std::shared_ptr<core::FlowFile>& flow_file);
+
+ std::error_code sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+ const std::shared_ptr<io::InputStream>& flow_file_content_stream);
std::vector<std::byte> delimiter_;
+ asio::io_context io_context_;
std::optional<std::unordered_map<detail::ConnectionId, std::shared_ptr<ConnectionHandlerBase>>> connections_;
std::optional<std::chrono::milliseconds> idle_connection_expiration_;
std::optional<size_t> max_size_of_socket_send_buffer_;
- std::chrono::milliseconds timeout_ = std::chrono::seconds(15);
- std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+ std::chrono::milliseconds timeout_duration_ = std::chrono::seconds(15);
+ std::optional<asio::ssl::context> ssl_context_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutTCP>::getLogger(uuid_);
};
diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index c59a86e80..90910dcd1 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -93,7 +93,7 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
}
const auto data = session->readBuffer(flow_file);
- if (data.status < 0) {
+ if (io::isError(data.status)) {
session->transfer(flow_file, Failure);
return;
}
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index 784de1824..353b7bbca 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -17,6 +17,8 @@
# under the License.
#
+include(Coroutines)
+enable_coroutines()
file(GLOB PROCESSOR_UNIT_TESTS "unit/*.cpp")
file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp")
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 9a4aa65a5..c63a67b22 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -29,7 +29,6 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
-constexpr uint64_t SYSLOG_PORT = 10255;
constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
struct ValidRFC5424Message {
@@ -249,56 +248,61 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
}
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
SingleProcessorTestController controller{listen_syslog};
LogTestController::getInstance().setTrace<ListenSyslog>();
- REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
std::string protocol;
+ uint16_t port = 0;
SECTION("UDP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+ protocol = "UDP";
+
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::udp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
}
protocol = "UDP";
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
- controller.plan->scheduleProcessor(listen_syslog);
- utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
- utils::sendUdpDatagram(invalid_syslog, endpoint);
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
}
SECTION("TCP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+ protocol = "TCP";
+
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::tcp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- protocol = "TCP";
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
- controller.plan->scheduleProcessor(listen_syslog);
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
- REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+ CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
}
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
- check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, protocol);
- check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
+ check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, protocol);
+ check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, protocol);
}
TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
@@ -306,75 +310,80 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
SingleProcessorTestController controller{listen_syslog};
LogTestController::getInstance().setTrace<ListenSyslog>();
- REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
std::string protocol;
+ uint16_t port = 0;
SECTION("UDP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+ protocol = "UDP";
+
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::udp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
}
- protocol = "UDP";
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
- controller.plan->scheduleProcessor(listen_syslog);
- std::this_thread::sleep_for(100ms);
- utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint);
-
- utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint);
- utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint);
-
- utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint);
- utils::sendUdpDatagram(invalid_syslog, endpoint);
+
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+ CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint), MatchesSuccess());
+
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess());
}
SECTION("TCP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+ protocol = "TCP";
+
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::tcp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- protocol = "TCP";
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
- controller.plan->scheduleProcessor(listen_syslog);
- std::this_thread::sleep_for(100ms);
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
- rfc5424_doc_example_2.unparsed_,
- rfc5424_doc_example_3.unparsed_,
- rfc5424_doc_example_4.unparsed_}, endpoint));
-
- REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
- rfc3164_doc_example_2.unparsed_,
- rfc3164_doc_example_3.unparsed_,
- rfc3164_doc_example_4.unparsed_}, endpoint));
-
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint));
- REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint));
+
+ CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
+ rfc5424_doc_example_2.unparsed_,
+ rfc5424_doc_example_3.unparsed_,
+ rfc5424_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+ CHECK_THAT(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
+ rfc3164_doc_example_2.unparsed_,
+ rfc3164_doc_example_3.unparsed_,
+ rfc3164_doc_example_4.unparsed_}, endpoint), MatchesSuccess());
+
+ CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), MatchesSuccess());
}
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
- REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
+ REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9},
+ {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
REQUIRE(result.at(ListenSyslog::Success).size() == 9);
REQUIRE(result.at(ListenSyslog::Invalid).size() == 1);
std::unordered_map<std::string, core::FlowFile&> success_flow_files;
- for (auto& flow_file : result.at(ListenSyslog::Success)) {
+ for (auto& flow_file: result.at(ListenSyslog::Success)) {
success_flow_files.insert({controller.plan->getContent(flow_file), *flow_file});
}
@@ -388,26 +397,25 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce
REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_3.unparsed_)));
REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_4.unparsed_)));
- check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, SYSLOG_PORT, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, port, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, SYSLOG_PORT, protocol);
- check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, SYSLOG_PORT, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, port, protocol);
+ check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, port, protocol);
REQUIRE(success_flow_files.contains(std::string(rfc5424_logger_example_1)));
CHECK(controller.plan->getContent(result.at(ListenSyslog::Invalid)[0]) == invalid_syslog);
}
-
TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog][NetworkListenerProcessor]") {
const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
SingleProcessorTestController controller{listen_syslog};
LogTestController::getInstance().setTrace<ListenSyslog>();
- REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, "0"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
SECTION("UDP") {
@@ -429,45 +437,48 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
SingleProcessorTestController controller{listen_syslog};
- REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "10"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxQueueSize, "50"));
LogTestController::getInstance().setWarn<ListenSyslog>();
+ uint16_t port = 0;
+
SECTION("UDP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::udp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
}
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
- controller.plan->scheduleProcessor(listen_syslog);
for (auto i = 0; i < 100; ++i) {
- utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint);
+ CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess());
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
}
SECTION("TCP") {
+ REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
asio::ip::tcp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
- controller.plan->scheduleProcessor(listen_syslog);
for (auto i = 0; i < 100; ++i) {
- REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint));
+ CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint), MatchesSuccess());
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
}
@@ -480,41 +491,44 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
}
TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
- asio::ip::tcp::endpoint endpoint;
- SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
- }
- SECTION("sending through IPv6", "[IPv6]") {
- if (utils::isIPv6Disabled())
- return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
- }
const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
-
SingleProcessorTestController controller{listen_syslog};
+
auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+ ssl_context_service->enable();
+
LogTestController::getInstance().setTrace<ListenSyslog>();
- REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
- ssl_context_service->enable();
- controller.plan->scheduleProcessor(listen_syslog);
- REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"));
- REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+
+ auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_syslog);
+
+ asio::ip::tcp::endpoint endpoint;
+ SECTION("sending through IPv4", "[IPv4]") {
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+ }
+ SECTION("sending through IPv6", "[IPv6]") {
+ if (utils::isIPv6Disabled())
+ return;
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
+ }
+
+ CHECK_THAT(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
+ CHECK_THAT(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
- check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, "TCP");
- check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, "TCP");
+ check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], port, "TCP");
+ check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], port, "TCP");
}
} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 158e1a072..32b0c6558 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -29,48 +29,45 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
-constexpr uint64_t PORT = 10254;
-
-void check_for_attributes(core::FlowFile& flow_file) {
- CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+ CHECK(std::to_string(port) == flow_file.getAttribute("tcp.port"));
const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
}
TEST_CASE("ListenTCP test multiple messages", "[ListenTCP][NetworkListenerProcessor]") {
+ const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+ SingleProcessorTestController controller{listen_tcp};
+ LogTestController::getInstance().setTrace<ListenTCP>();
+ REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
+ auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
asio::ip::tcp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
- SingleProcessorTestController controller{listen_tcp};
- LogTestController::getInstance().setTrace<ListenTCP>();
- REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
- REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
- controller.plan->scheduleProcessor(listen_tcp);
- REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint));
- REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint));
+ CHECK_THAT(utils::sendMessagesViaTCP({"test_message_1"}, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendMessagesViaTCP({"another_message"}, endpoint), MatchesSuccess());
ProcessorTriggerResult result;
REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[1]) == "another_message");
- check_for_attributes(*result.at(ListenTCP::Success)[0]);
- check_for_attributes(*result.at(ListenTCP::Success)[1]);
+ check_for_attributes(*result.at(ListenTCP::Success)[0], port);
+ check_for_attributes(*result.at(ListenTCP::Success)[1], port);
}
TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]") {
const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
SingleProcessorTestController controller{listen_tcp};
LogTestController::getInstance().setTrace<ListenTCP>();
- REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+ REQUIRE(listen_tcp->setProperty(ListenTCP::Port, "0"));
REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "100"));
REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
@@ -79,27 +76,27 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]
}
TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkListenerProcessor]") {
+ const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+ SingleProcessorTestController controller{listen_tcp};
+ LogTestController::getInstance().setTrace<ListenTCP>();
+ REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
+ REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
+ auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+
asio::ip::tcp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
- SingleProcessorTestController controller{listen_tcp};
- REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
- REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
- REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
LogTestController::getInstance().setWarn<ListenTCP>();
- controller.plan->scheduleProcessor(listen_tcp);
for (auto i = 0; i < 100; ++i) {
- REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint));
+ CHECK_THAT(utils::sendMessagesViaTCP({"test_message"}, endpoint), MatchesSuccess());
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
@@ -113,7 +110,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkLis
TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProcessor]") {
const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
-
+ uint16_t port = 0;
SingleProcessorTestController controller{listen_tcp};
auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
LogTestController::getInstance().setTrace<ListenTCP>();
@@ -122,7 +119,6 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
- REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), std::to_string(PORT)));
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
std::vector<std::string> expected_successful_messages;
@@ -131,60 +127,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
SECTION("Without client certificate verification") {
SECTION("Client certificate not required, Client Auth set to NONE by default") {
+ ssl_context_service->enable();
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
}
SECTION("Client certificate not required, but validated if provided") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+ ssl_context_service->enable();
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
}
- ssl_context_service->enable();
- controller.plan->scheduleProcessor(listen_tcp);
expected_successful_messages = {"test_message_1", "another_message"};
- for (const auto& message : expected_successful_messages) {
- REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+ for (const auto& message: expected_successful_messages) {
+ CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesSuccess());
}
}
SECTION("With client certificate provided") {
SECTION("Client certificate required") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+ ssl_context_service->enable();
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
}
SECTION("Client certificate not required but validated") {
REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+ ssl_context_service->enable();
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
}
- ssl_context_service->enable();
- controller.plan->scheduleProcessor(listen_tcp);
minifi::utils::net::SslData ssl_data;
ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
@@ -194,31 +194,114 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
expected_successful_messages = {"test_message_1", "another_message"};
for (const auto& message : expected_successful_messages) {
- REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data));
+ CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data), MatchesSuccess());
}
}
SECTION("Required certificate not provided") {
+ ssl_context_service->enable();
+ REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+ port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
}
- REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
- ssl_context_service->enable();
- controller.plan->scheduleProcessor(listen_tcp);
- REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+ CHECK_THAT(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesError());
}
ProcessorTriggerResult result;
REQUIRE(controller.triggerUntil({{ListenTCP::Success, expected_successful_messages.size()}}, result, 300ms, 50ms));
for (std::size_t i = 0; i < expected_successful_messages.size(); ++i) {
CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[i]) == expected_successful_messages[i]);
- check_for_attributes(*result.at(ListenTCP::Success)[i]);
+ check_for_attributes(*result.at(ListenTCP::Success)[i], port);
+ }
+}
+
+namespace {
+bool isSslMethodAvailable(asio::ssl::context::method method) {
+ try {
+ [[maybe_unused]] asio::ssl::context ctx(method);
+ return true;
+ } catch (const asio::system_error& err) {
+ if (err.code() == asio::error::invalid_argument) {
+ return false;
+ } else {
+ throw;
+ }
+ }
+}
+} // namespace
+
+TEST_CASE("Test ListenTCP SSL/TLS compatibility", "[ListenTCP][NetworkListenerProcessor]") {
+ const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+ SingleProcessorTestController controller{listen_tcp};
+ auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
+ LogTestController::getInstance().setTrace<ListenTCP>();
+ const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+ REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
+ REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+ REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+ REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
+ REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
+ REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService"));
+ REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+
+ ssl_context_service->enable();
+ uint16_t port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
+ asio::ip::tcp::endpoint endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+
+ minifi::utils::net::SslData ssl_data;
+ ssl_data.ca_loc = executable_dir / "resources" / "ca_A.crt";
+ ssl_data.cert_loc = executable_dir / "resources" / "localhost_by_A.pem";
+ ssl_data.key_loc = executable_dir / "resources" / "localhost_by_A.pem";
+ ssl_data.key_pw = "Password12";
+
+
+ asio::ssl::context::method client_method;
+ bool expected_to_work;
+
+ SECTION("sslv2 should be disabled") {
+ client_method = asio::ssl::context::method::sslv2_client;
+ expected_to_work = false;
+ }
+
+ SECTION("sslv3 should be disabled") {
+ client_method = asio::ssl::context::method::sslv3_client;
+ expected_to_work = false;
+ }
+
+ SECTION("tlsv11 should be disabled") {
+ client_method = asio::ssl::context::method::tlsv11_client;
+ expected_to_work = false;
+ }
+
+ SECTION("tlsv12 should be enabled") {
+ client_method = asio::ssl::context::method::tlsv12_client;
+ expected_to_work = true;
+ }
+
+ SECTION("tlsv13 should be enabled") {
+ client_method = asio::ssl::context::method::tlsv13_client;
+ expected_to_work = true;
+ }
+
+ if (!isSslMethodAvailable(client_method))
+ return;
+
+ auto send_result = utils::sendMessagesViaSSL({"message"}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data, client_method);
+ if (expected_to_work) {
+ CHECK_THAT(send_result, MatchesSuccess());
+ ProcessorTriggerResult result;
+ CHECK(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
+ } else {
+ CHECK_THAT(send_result, MatchesError());
+ ProcessorTriggerResult result;
+ CHECK_FALSE(controller.triggerUntil({{ListenTCP::Success, 1}}, result, 300ms, 50ms));
}
}
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
index ecce9120d..ca1c0ec25 100644
--- a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -29,49 +29,49 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
-constexpr uint64_t PORT = 10256;
-
-void check_for_attributes(core::FlowFile& flow_file) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
- CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+ CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
}
TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+ const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+ SingleProcessorTestController controller{listen_udp};
+ LogTestController::getInstance().setTrace<ListenUDP>();
+
+ REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+ auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
asio::ip::udp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
}
- const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
- SingleProcessorTestController controller{listen_udp};
- LogTestController::getInstance().setTrace<ListenUDP>();
- REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
- REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
controller.plan->scheduleProcessor(listen_udp);
- REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
- REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+ CHECK_THAT(utils::sendUdpDatagram({"test_message_1"}, endpoint), MatchesSuccess());
+ CHECK_THAT(utils::sendUdpDatagram({"another_message"}, endpoint), MatchesSuccess());
ProcessorTriggerResult result;
REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));
CHECK(result.at(ListenUDP::Success).size() == 2);
CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1");
CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message");
- check_for_attributes(*result.at(ListenUDP::Success)[0]);
- check_for_attributes(*result.at(ListenUDP::Success)[1]);
+ check_for_attributes(*result.at(ListenUDP::Success)[0], port);
+ check_for_attributes(*result.at(ListenUDP::Success)[1], port);
}
TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") {
const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
SingleProcessorTestController controller{listen_udp};
LogTestController::getInstance().setTrace<ListenUDP>();
- REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+ REQUIRE(listen_udp->setProperty(ListenUDP::Port, "0"));
REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "100"));
REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
@@ -80,27 +80,28 @@ TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]
}
TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkListenerProcessor]") {
+ const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+ SingleProcessorTestController controller{listen_udp};
+ REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
+ REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
+
+ auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp);
+
asio::ip::udp::endpoint endpoint;
SECTION("sending through IPv4", "[IPv4]") {
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port);
}
SECTION("sending through IPv6", "[IPv6]") {
if (utils::isIPv6Disabled())
return;
- endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+ endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), port);
}
- const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
-
- SingleProcessorTestController controller{listen_udp};
- REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
- REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
- REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
LogTestController::getInstance().setWarn<ListenUDP>();
controller.plan->scheduleProcessor(listen_udp);
for (auto i = 0; i < 100; ++i) {
- REQUIRE(utils::sendUdpDatagram({"test_message"}, endpoint));
+ CHECK_THAT(utils::sendUdpDatagram({"test_message"}, endpoint), MatchesSuccess());
}
CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 3396a50b4..ac44c4db8 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -27,9 +27,10 @@
#include "controllers/SSLContextService.h"
#include "core/ProcessSession.h"
#include "utils/net/TcpServer.h"
-#include "utils/net/SslServer.h"
+#include "utils/net/AsioCoro.h"
#include "utils/expected.h"
#include "utils/StringUtils.h"
+#include "IntegrationTestUtils.h"
using namespace std::literals::chrono_literals;
@@ -38,69 +39,48 @@ namespace org::apache::nifi::minifi::processors {
using controllers::SSLContextService;
namespace {
-using utils::net::TcpSession;
-using utils::net::TcpServer;
-using utils::net::SslSession;
-using utils::net::SslServer;
-
-class ISessionAwareServer {
+class CancellableTcpServer : public utils::net::TcpServer {
public:
- [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
- virtual void closeSessions() = 0;
-};
+ using utils::net::TcpServer::TcpServer;
-template<class SessionType>
-class SessionAwareServer : public ISessionAwareServer {
- protected:
- size_t getNumberOfSessions() const override {
- std::lock_guard lock_guard{mutex_};
- return sessions_.size();
- }
-
- void closeSessions() override {
- std::lock_guard lock_guard{mutex_};
- for (const auto& session_weak : sessions_) {
- if (auto session = session_weak.lock()) {
- auto& socket = session->getSocket();
- if (socket.is_open()) {
- socket.shutdown(asio::ip::tcp::socket::shutdown_both);
- session->getSocket().close();
- }
- }
- }
+ size_t getNumberOfSessions() const {
+ return cancellable_timers_.size();
}
- mutable std::mutex mutex_;
- std::vector<std::weak_ptr<SessionType>> sessions_;
-};
+ void cancelEverything() {
+ for (auto& timer : cancellable_timers_)
+ io_context_.post([=]{timer->cancel();});
+ }
-class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
- public:
- using TcpServer::TcpServer;
+ asio::awaitable<void> doReceive() override {
+ using asio::experimental::awaitable_operators::operator||;
- protected:
- std::shared_ptr<TcpSession> createSession() override {
- std::lock_guard lock_guard{mutex_};
- auto session = TcpServer::createSession();
- logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
- sessions_.emplace_back(session);
- return session;
+ asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+ if (port_ == 0)
+ port_ = acceptor.local_endpoint().port();
+ while (true) {
+ auto [accept_error, socket] = co_await acceptor.async_accept(utils::net::use_nothrow_awaitable);
+ if (accept_error) {
+ logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+ break;
+ }
+ auto cancellable_timer = std::make_shared<asio::steady_timer>(io_context_);
+ cancellable_timers_.push_back(cancellable_timer);
+ if (ssl_data_)
+ co_spawn(io_context_, secureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+ else
+ co_spawn(io_context_, insecureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached);
+ }
}
-};
-
-class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
- public:
- using SslServer::SslServer;
- protected:
- std::shared_ptr<SslSession> createSession() override {
- std::lock_guard lock_guard{mutex_};
- auto session = SslServer::createSession();
- logger_->log_trace("SessionAwareSslServer::createSession %p", session.get());
- sessions_.emplace_back(session);
- return session;
+ private:
+ static asio::awaitable<void> wait_until_cancelled(std::shared_ptr<asio::steady_timer> timer) {
+ timer->expires_at(asio::steady_timer::time_point::max());
+ co_await utils::net::async_wait(*timer);
}
+
+ std::vector<std::shared_ptr<asio::steady_timer>> cancellable_timers_;
};
utils::net::SslData createSslDataForServer() {
@@ -129,28 +109,28 @@ class PutTCPTestFixture {
}
void stopServers() {
- for (auto& [port, server] : listeners_) {
- auto& listener = server.listener_;
+ for (auto& [port, server] : servers_) {
+ auto& cancellable_server = server.cancellable_server;
auto& server_thread = server.server_thread_;
- if (listener)
- listener->stop();
+ if (cancellable_server)
+ cancellable_server->stop();
if (server_thread.joinable())
server_thread.join();
- listener.reset();
+ cancellable_server.reset();
}
}
size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
- if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
- return session_aware_listener->getNumberOfSessions() - 1; // There is always one inactive session waiting for a new connection
+ if (auto cancellable_tcp_server = getServer(port)) {
+ return cancellable_tcp_server->getNumberOfSessions();
}
return -1;
}
void closeActiveConnections() {
- for (auto& [port, server] : listeners_) {
- if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
- session_aware_listener->closeSessions();
+ for (auto& [port, server] : servers_) {
+ if (auto cancellable_tcp_server = getServer(port)) {
+ cancellable_tcp_server->cancelEverything();
}
}
std::this_thread::sleep_for(200ms);
@@ -171,7 +151,7 @@ class PutTCPTestFixture {
auto start_time = std::chrono::system_clock::now();
utils::net::Message result;
while (start_time + timeout > std::chrono::system_clock::now()) {
- if (getListener(port)->tryDequeue(result))
+ if (getServer(port)->tryDequeue(result))
return result;
std::this_thread::sleep_for(interval);
}
@@ -204,14 +184,17 @@ class PutTCPTestFixture {
}
uint16_t addTCPServer() {
- uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
- listeners_[port].startTCPServer(port);
+ Server server;
+ uint16_t port = server.startTCPServer(std::nullopt);
+ servers_[port] = std::move(server);
return port;
}
uint16_t addSSLServer() {
- uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
- listeners_[port].startSSLServer(port);
+ auto ssl_server_options = utils::net::SslServerOptions{createSslDataForServer(), utils::net::ClientAuthOption::REQUIRED};
+ Server server;
+ uint16_t port = server.startTCPServer(ssl_server_options);
+ servers_[port] = std::move(server);
return port;
}
@@ -224,47 +207,36 @@ class PutTCPTestFixture {
}
[[nodiscard]] uint16_t getSinglePort() const {
- gsl_Expects(listeners_.size() == 1);
- return listeners_.begin()->first;
+ gsl_Expects(servers_.size() == 1);
+ return servers_.begin()->first;
}
private:
- utils::net::Server* getListener(std::optional<uint16_t> port) {
+ CancellableTcpServer* getServer(std::optional<uint16_t> port) {
if (!port)
port = getSinglePort();
- return listeners_.at(*port).listener_.get();
+ return servers_.at(*port).cancellable_server.get();
}
const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
test::SingleProcessorTestController controller_{put_tcp_};
- std::mt19937 random_engine_{std::random_device{}()}; // NOLINT: "Missing space before { [whitespace/braces] [5]"
- // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-
class Server {
public:
Server() = default;
- void startTCPServer(uint16_t port) {
- gsl_Expects(!listener_ && !server_thread_.joinable());
- listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
- server_thread_ = std::thread([this]() { listener_->run(); });
- }
-
- void startSSLServer(uint16_t port) {
- gsl_Expects(!listener_ && !server_thread_.joinable());
- listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
- port,
- core::logging::LoggerFactory<utils::net::Server>::getLogger(),
- createSslDataForServer(),
- utils::net::SslServer::ClientAuthOption::REQUIRED);
- server_thread_ = std::thread([this]() { listener_->run(); });
+ uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> ssl_server_options) {
+ gsl_Expects(!cancellable_server && !server_thread_.joinable());
+ cancellable_server = std::make_unique<CancellableTcpServer>(std::nullopt, 0, core::logging::LoggerFactory<utils::net::Server>::getLogger(), std::move(ssl_server_options));
+ server_thread_ = std::thread([this]() { cancellable_server->run(); });
+ REQUIRE(utils::verifyEventHappenedInPollTime(250ms, [this] { return cancellable_server->getPort() != 0; }, 20ms));
+ return cancellable_server->getPort();
}
- std::unique_ptr<utils::net::Server> listener_;
+ std::unique_ptr<CancellableTcpServer> cancellable_server;
std::thread server_thread_;
};
- std::unordered_map<uint16_t, Server> listeners_;
+ std::unordered_map<uint16_t, Server> servers_;
};
void trigger_expect_success(PutTCPTestFixture& test_fixture, const std::string_view message, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
@@ -382,7 +354,8 @@ TEST_CASE("PutTCP test invalid host", "[PutTCP]") {
trigger_expect_failure(test_fixture, "message for invalid host");
CHECK((LogTestController::getInstance().contains("Host not found", 0ms)
- || LogTestController::getInstance().contains("No such host is known", 0ms)));
+ || LogTestController::getInstance().contains("No such host is known", 0ms)
+ || LogTestController::getInstance().contains("A connection attempt failed because the connected party did not properly respond", 0ms)));
}
TEST_CASE("PutTCP test invalid server", "[PutTCP]") {
@@ -412,7 +385,8 @@ TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
test_fixture.setPutTCPPort(1235);
trigger_expect_failure(test_fixture, "message for non-routable server");
- CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
+ CHECK((LogTestController::getInstance().contains("No route to host", 0ms)
+ || LogTestController::getInstance().contains("Connection timed out", 0ms)
|| LogTestController::getInstance().contains("Operation timed out", 0ms)
|| LogTestController::getInstance().contains("host has failed to respond", 0ms)
|| LogTestController::getInstance().contains("No route to host", 0ms)));
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index ca63f5da5..b22ad6746 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
TEST_CASE("PutUDP", "[putudp]") {
const auto put_udp = std::make_shared<PutUDP>("PutUDP");
- auto random_engine = std::mt19937{std::random_device{}()}; // NOLINT: "Missing space before { [whitespace/braces] [5]"
- // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
- const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
test::SingleProcessorTestController controller{put_udp};
LogTestController::getInstance().setTrace<PutUDP>();
LogTestController::getInstance().setTrace<core::ProcessContext>();
put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
- put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
- utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+ utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
auto server_thread = std::thread([&listener]() { listener.run(); });
+ uint16_t port = listener.getPort();
+ auto deadline = std::chrono::steady_clock::now() + 200ms;
+ while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+ std::this_thread::sleep_for(20ms);
+ port = listener.getPort();
+ }
auto cleanup_server = gsl::finally([&]{
listener.stop();
server_thread.join();
});
+ put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
{
const char* const message = "first message: hello";
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index a9c7a8c55..300db0266 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -141,13 +141,10 @@ class SSLContextService : public core::controller::ControllerService {
std::unique_ptr<SSLContext> createSSLContext();
- const std::filesystem::path& getCertificateFile();
-
- const std::string& getPassphrase();
-
- const std::filesystem::path& getPrivateKeyFile();
-
- const std::filesystem::path& getCACertificate();
+ const std::filesystem::path& getCertificateFile() const;
+ const std::string& getPassphrase() const;
+ const std::filesystem::path& getPrivateKeyFile() const;
+ const std::filesystem::path& getCACertificate() const;
void yield() override {
}
@@ -203,7 +200,7 @@ class SSLContextService : public core::controller::ControllerService {
protected:
virtual void initializeProperties();
- std::mutex initialization_mutex_;
+ mutable std::mutex initialization_mutex_;
bool initialized_;
std::filesystem::path certificate_;
std::filesystem::path private_key_;
diff --git a/libminifi/include/utils/net/AsioCoro.h b/libminifi/include/utils/net/AsioCoro.h
new file mode 100644
index 000000000..5c2e5268b
--- /dev/null
+++ b/libminifi/include/utils/net/AsioCoro.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <tuple>
+#include <system_error>
+#include <utility>
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/steady_timer.hpp"
+#include "asio/this_coro.hpp"
+#include "asio/use_awaitable.hpp"
+#include "asio/experimental/awaitable_operators.hpp"
+#include "asio/experimental/as_tuple.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+constexpr auto use_nothrow_awaitable = asio::experimental::as_tuple(asio::use_awaitable);
+
+using HandshakeType = asio::ssl::stream_base::handshake_type;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<asio::ip::tcp::socket>;
+
+#if defined(__GNUC__) && __GNUC__ < 11
+// [coroutines] unexpected 'warning: statement has no effect [-Wunused-value]'
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96749
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-value"
+#endif // defined(__GNUC__) && __GNUC__ < 11
+inline asio::awaitable<void> async_wait(asio::steady_timer& timer) {
+ co_await timer.async_wait(utils::net::use_nothrow_awaitable);
+}
+#if defined(__GNUC__) && __GNUC__ < 11
+#pragma GCC diagnostic pop
+#endif // defined(__GNUC__) && __GNUC__ < 11
+
+namespace detail {
+inline asio::awaitable<void> timeout(std::chrono::steady_clock::duration duration) {
+ asio::steady_timer timer(co_await asio::this_coro::executor); // NOLINT
+ timer.expires_after(duration);
+ co_await async_wait(timer);
+}
+} // namespace detail
+
+template<class... Types>
+asio::awaitable<std::tuple<std::error_code, Types...>> asyncOperationWithTimeout(asio::awaitable<std::tuple<std::error_code, Types...>>&& async_operation,
+ std::chrono::steady_clock::duration timeout_duration) {
+ using asio::experimental::awaitable_operators::operator||;
+ auto operation_result = co_await(std::move(async_operation) || detail::timeout(timeout_duration));
+ if (operation_result.index() == 1) {
+ std::tuple<std::error_code, Types...> result;
+ std::get<0>(result) = asio::error::timed_out;
+ co_return result;
+ }
+ co_return std::get<0>(operation_result);
+}
+} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Server.h b/libminifi/include/utils/net/Server.h
index 5a6d7622d..e84815c3b 100644
--- a/libminifi/include/utils/net/Server.h
+++ b/libminifi/include/utils/net/Server.h
@@ -26,7 +26,9 @@
#include "core/logging/Logger.h"
#include "asio/ts/buffer.hpp"
#include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
+#include "asio/awaitable.hpp"
+#include "asio/co_spawn.hpp"
+#include "asio/detached.hpp"
#include "IpProtocol.h"
namespace org::apache::nifi::minifi::utils::net {
@@ -50,6 +52,7 @@ struct Message {
class Server {
public:
virtual void run() {
+ asio::co_spawn(io_context_, doReceive(), asio::detached);
io_context_.run();
}
virtual void reset() {
@@ -68,10 +71,16 @@ class Server {
stop();
}
+ uint16_t getPort() const {
+ return port_;
+ }
+
protected:
- Server(std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
- : max_queue_size_(std::move(max_queue_size)), logger_(logger) {}
+ virtual asio::awaitable<void> doReceive() = 0;
+ Server(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
+ : port_(port), max_queue_size_(max_queue_size), logger_(std::move(logger)) {}
+ std::atomic<uint16_t> port_;
utils::ConcurrentQueue<Message> concurrent_queue_;
asio::io_context io_context_;
std::optional<size_t> max_queue_size_;
diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h
deleted file mode 100644
index 173fdcb02..000000000
--- a/libminifi/include/utils/net/SessionHandlingServer.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <utility>
-#include <memory>
-
-#include "Server.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-template<typename SessionType>
-class SessionHandlingServer : public Server {
- public:
- SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
- : Server(max_queue_size, std::move(logger)),
- acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) {
- }
-
- void run() override {
- startAccept();
- Server::run();
- }
-
- protected:
- void startAccept() {
- auto new_session = createSession();
- acceptor_.async_accept(new_session->getSocket(),
- [this, new_session](const auto& error_code) -> void {
- handleAccept(new_session, error_code);
- });
- }
-
- void handleAccept(const std::shared_ptr<SessionType>& session, const std::error_code& error) {
- if (error) {
- return;
- }
-
- session->start();
- auto new_session = createSession();
- acceptor_.async_accept(new_session->getSocket(),
- [this, new_session](const auto& error_code) -> void {
- handleAccept(new_session, error_code);
- });
- }
-
- virtual std::shared_ptr<SessionType> createSession() = 0;
-
- asio::ip::tcp::acceptor acceptor_;
-};
-
-} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Ssl.h b/libminifi/include/utils/net/Ssl.h
index 7a32e30ac..d8ea4621a 100644
--- a/libminifi/include/utils/net/Ssl.h
+++ b/libminifi/include/utils/net/Ssl.h
@@ -23,9 +23,16 @@
#include "core/ProcessContext.h"
#include "core/Property.h"
#include "core/logging/Logger.h"
+#include "utils/Enum.h"
namespace org::apache::nifi::minifi::utils::net {
+SMART_ENUM(ClientAuthOption,
+ (NONE, "NONE"),
+ (WANT, "WANT"),
+ (REQUIRED, "REQUIRED")
+)
+
struct SslData {
std::filesystem::path ca_loc;
std::filesystem::path cert_loc;
@@ -37,6 +44,15 @@ struct SslData {
}
};
+struct SslServerOptions {
+ SslData cert_data;
+ ClientAuthOption client_auth_option;
+
+ SslServerOptions(SslData cert_data, ClientAuthOption client_auth_option)
+ : cert_data(cert_data),
+ client_auth_option(client_auth_option) {}
+};
+
std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::Property& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger);
} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/SslServer.h b/libminifi/include/utils/net/SslServer.h
deleted file mode 100644
index ddd04e773..000000000
--- a/libminifi/include/utils/net/SslServer.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "SessionHandlingServer.h"
-
-#include <memory>
-#include <string>
-
-#include "Ssl.h"
-#include "asio/ssl.hpp"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-using ssl_socket = asio::ssl::stream<asio::ip::tcp::socket>;
-
-class SslSession : public std::enable_shared_from_this<SslSession> {
- public:
- SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
- std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
-
- ssl_socket::lowest_layer_type& getSocket();
- void start();
- void handleReadUntilNewLine(std::error_code error_code);
-
- protected:
- utils::ConcurrentQueue<Message>& concurrent_queue_;
- std::optional<size_t> max_queue_size_;
- asio::basic_streambuf<std::allocator<char>> buffer_;
- std::shared_ptr<core::logging::Logger> logger_;
- ssl_socket socket_;
-};
-
-class SslServer : public SessionHandlingServer<SslSession> {
- public:
- SMART_ENUM(ClientAuthOption,
- (NONE, "NONE"),
- (WANT, "WANT"),
- (REQUIRED, "REQUIRED")
- )
-
- SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth);
-
- protected:
- std::shared_ptr<SslSession> createSession() override;
-
- asio::ssl::context context_;
- SslData ssl_data_;
-};
-
-} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h
index a70b26bd9..717d674ed 100644
--- a/libminifi/include/utils/net/TcpServer.h
+++ b/libminifi/include/utils/net/TcpServer.h
@@ -16,36 +16,30 @@
*/
#pragma once
-#include <optional>
+#include <utility>
#include <memory>
-#include "SessionHandlingServer.h"
+#include "Server.h"
+#include "Ssl.h"
namespace org::apache::nifi::minifi::utils::net {
-class TcpSession : public std::enable_shared_from_this<TcpSession> {
+class TcpServer : public Server {
public:
- TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
- asio::ip::tcp::socket& getSocket();
- void start();
- void handleReadUntilNewLine(std::error_code error_code);
-
- private:
- utils::ConcurrentQueue<Message>& concurrent_queue_;
- std::optional<size_t> max_queue_size_;
- asio::basic_streambuf<std::allocator<char>> buffer_;
- asio::ip::tcp::socket socket_;
- std::shared_ptr<core::logging::Logger> logger_;
-};
-
-class TcpServer : public SessionHandlingServer<TcpSession> {
- public:
- TcpServer(std::optional<size_t> max_queue_size,
- uint16_t port,
- std::shared_ptr<core::logging::Logger> logger);
+ TcpServer(std::optional<size_t> max_queue_size_, uint16_t port, std::shared_ptr<core::logging::Logger> logger, std::optional<SslServerOptions> ssl_data)
+ : Server(max_queue_size_, port, std::move(logger)),
+ ssl_data_(std::move(ssl_data)) {
+ }
protected:
- std::shared_ptr<TcpSession> createSession() override;
+ asio::awaitable<void> doReceive() override;
+
+ asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket);
+ asio::awaitable<void> secureSession(asio::ip::tcp::socket socket);
+
+ asio::awaitable<void> readLoop(auto& socket);
+
+ std::optional<SslServerOptions> ssl_data_;
};
} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/UdpServer.h b/libminifi/include/utils/net/UdpServer.h
index e9b852b00..97cfa5737 100644
--- a/libminifi/include/utils/net/UdpServer.h
+++ b/libminifi/include/utils/net/UdpServer.h
@@ -19,14 +19,12 @@
#include <optional>
#include <memory>
#include <string>
+#include <asio/awaitable.hpp>
#include "Server.h"
#include "utils/MinifiConcurrentQueue.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
-#include "asio/ts/buffer.hpp"
-#include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
namespace org::apache::nifi::minifi::utils::net {
@@ -37,13 +35,7 @@ class UdpServer : public Server {
std::shared_ptr<core::logging::Logger> logger);
private:
- void doReceive();
-
- asio::ip::udp::socket socket_;
- asio::ip::udp::endpoint sender_endpoint_;
- std::string buffer_;
-
- static constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+ asio::awaitable<void> doReceive() override;
};
} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 165805d86..635e95144 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -31,13 +31,11 @@
#include <fstream>
#include <memory>
#include <string>
-#include <set>
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
#include "io/validation.h"
#include "properties/Configure.h"
-#include "utils/gsl.h"
#include "utils/tls/CertificateUtils.h"
#include "utils/tls/TLSUtils.h"
#include "utils/tls/DistinguishedName.h"
@@ -407,22 +405,22 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
#endif
}
-const std::filesystem::path &SSLContextService::getCertificateFile() {
+const std::filesystem::path &SSLContextService::getCertificateFile() const {
std::lock_guard<std::mutex> lock(initialization_mutex_);
return certificate_;
}
-const std::string &SSLContextService::getPassphrase() {
+const std::string &SSLContextService::getPassphrase() const {
std::lock_guard<std::mutex> lock(initialization_mutex_);
return passphrase_;
}
-const std::filesystem::path &SSLContextService::getPrivateKeyFile() {
+const std::filesystem::path &SSLContextService::getPrivateKeyFile() const {
std::lock_guard<std::mutex> lock(initialization_mutex_);
return private_key_;
}
-const std::filesystem::path &SSLContextService::getCACertificate() {
+const std::filesystem::path &SSLContextService::getCACertificate() const {
std::lock_guard<std::mutex> lock(initialization_mutex_);
return ca_certificate_;
}
diff --git a/libminifi/src/utils/net/SslServer.cpp b/libminifi/src/utils/net/SslServer.cpp
deleted file mode 100644
index 7aaf4845a..000000000
--- a/libminifi/src/utils/net/SslServer.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "utils/net/SslServer.h"
-
-namespace org::apache::nifi::minifi::utils::net {
-
-SslSession::SslSession(asio::io_context& io_context, asio::ssl::context& context, utils::ConcurrentQueue<Message>& concurrent_queue,
- std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
- : concurrent_queue_(concurrent_queue),
- max_queue_size_(max_queue_size),
- logger_(std::move(logger)),
- socket_(io_context, context) {
-}
-
-ssl_socket::lowest_layer_type& SslSession::getSocket() {
- return socket_.lowest_layer();
-}
-
-void SslSession::start() {
- socket_.async_handshake(asio::ssl::stream_base::server,
- [this, self = shared_from_this()](const std::error_code& error_code) {
- if (error_code) {
- logger_->log_error("Error occured during SSL handshake: (%d) %s", error_code.value(), error_code.message());
- return;
- }
- asio::async_read_until(socket_,
- buffer_,
- '\n',
- [self](const auto& error_code, size_t) -> void {
- self->handleReadUntilNewLine(error_code);
- });
- });
-}
-
-void SslSession::handleReadUntilNewLine(std::error_code error_code) {
- if (error_code)
- return;
- std::istream is(&buffer_);
- std::string message;
- std::getline(is, message);
- if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
- concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, getSocket().remote_endpoint().address(), getSocket().local_endpoint().port()));
- else
- logger_->log_warn("Queue is full. TCP message ignored.");
- asio::async_read_until(socket_,
- buffer_,
- '\n',
- [self = shared_from_this()](const auto& error_code, size_t) -> void {
- self->handleReadUntilNewLine(error_code);
- });
-}
-
-SslServer::SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger, SslData ssl_data, ClientAuthOption client_auth)
- : SessionHandlingServer<SslSession>(max_queue_size, port, std::move(logger)),
- context_(asio::ssl::context::sslv23),
- ssl_data_(std::move(ssl_data)) {
- context_.set_options(
- asio::ssl::context::default_workarounds
- | asio::ssl::context::no_sslv2
- | asio::ssl::context::single_dh_use);
- context_.set_password_callback([this](std::size_t&, asio::ssl::context_base::password_purpose&) { return ssl_data_.key_pw; });
- context_.use_certificate_file(ssl_data_.cert_loc.string(), asio::ssl::context::pem);
- context_.use_private_key_file(ssl_data_.key_loc.string(), asio::ssl::context::pem);
- context_.load_verify_file(ssl_data_.ca_loc.string());
- if (client_auth == ClientAuthOption::REQUIRED) {
- context_.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
- } else if (client_auth == ClientAuthOption::WANT) {
- context_.set_verify_mode(asio::ssl::verify_peer);
- }
-}
-
-std::shared_ptr<SslSession> SslServer::createSession() {
- return std::make_shared<SslSession>(io_context_, context_, concurrent_queue_, max_queue_size_, logger_);
-}
-
-} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp
index 742ef7f25..fcd02d1e4 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -15,53 +15,73 @@
* limitations under the License.
*/
#include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
namespace org::apache::nifi::minifi::utils::net {
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
- : concurrent_queue_(concurrent_queue),
- max_queue_size_(max_queue_size),
- socket_(io_context),
- logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+ asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+ if (port_ == 0)
+ port_ = acceptor.local_endpoint().port();
+ while (true) {
+ auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+ if (accept_error) {
+ logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+ break;
+ }
+ if (ssl_data_)
+ co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+ else
+ co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+ }
}
-asio::ip::tcp::socket& TcpSession::getSocket() {
- return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+ std::string read_message;
+ while (true) {
+ auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable); // NOLINT
+ if (read_error || bytes_read == 0)
+ co_return;
-void TcpSession::start() {
- asio::async_read_until(socket_,
- buffer_,
- '\n',
- [self = shared_from_this()](const auto& error_code, size_t) -> void {
- self->handleReadUntilNewLine(error_code);
- });
+ if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+ concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));
+ else
+ logger_->log_warn("Queue is full. TCP message ignored.");
+ read_message.erase(0, bytes_read);
+ }
}
-void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
- if (error_code)
- return;
- std::istream is(&buffer_);
- std::string message;
- std::getline(is, message);
- if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
- concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
- else
- logger_->log_warn("Queue is full. TCP message ignored.");
- asio::async_read_until(socket_,
- buffer_,
- '\n',
- [self = shared_from_this()](const auto& error_code, size_t) -> void {
- self->handleReadUntilNewLine(error_code);
- });
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) {
+ co_return co_await readLoop(socket); // NOLINT
}
-TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
- : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
+namespace {
+asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
+ asio::ssl::context ssl_context(asio::ssl::context::tls_server);
+ ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+ ssl_context.set_password_callback([key_pw = ssl_data.cert_data.key_pw](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; });
+ ssl_context.use_certificate_file(ssl_data.cert_data.cert_loc.string(), asio::ssl::context::pem);
+ ssl_context.use_private_key_file(ssl_data.cert_data.key_loc.string(), asio::ssl::context::pem);
+ ssl_context.load_verify_file(ssl_data.cert_data.ca_loc.string());
+ if (ssl_data.client_auth_option == ClientAuthOption::REQUIRED) {
+ ssl_context.set_verify_mode(asio::ssl::verify_peer|asio::ssl::verify_fail_if_no_peer_cert);
+ } else if (ssl_data.client_auth_option == ClientAuthOption::WANT) {
+ ssl_context.set_verify_mode(asio::ssl::verify_peer);
+ }
+ return ssl_context;
}
+} // namespace
-std::shared_ptr<TcpSession> TcpServer::createSession() {
- return std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_);
+asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket) {
+ gsl_Expects(ssl_data_);
+ auto ssl_context = setupSslContext(*ssl_data_);
+ SslSocket ssl_socket(std::move(socket), ssl_context);
+ auto [handshake_error] = co_await ssl_socket.async_handshake(HandshakeType::server, use_nothrow_awaitable);
+ if (handshake_error) {
+ core::logging::LOG_WARN(logger_) << "Handshake with " << ssl_socket.lowest_layer().remote_endpoint() << " failed due to " << handshake_error.message();
+ co_return;
+ }
+ co_return co_await readLoop(ssl_socket); // NOLINT
}
} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
index 6137f2ddf..e6179db13 100644
--- a/libminifi/src/utils/net/UdpServer.cpp
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -15,32 +15,39 @@
* limitations under the License.
*/
#include "utils/net/UdpServer.h"
+#include "asio/use_awaitable.hpp"
+#include "asio/detached.hpp"
+#include "utils/net/AsioCoro.h"
namespace org::apache::nifi::minifi::utils::net {
+constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+
UdpServer::UdpServer(std::optional<size_t> max_queue_size,
uint16_t port,
std::shared_ptr<core::logging::Logger> logger)
- : Server(max_queue_size, std::move(logger)),
- socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
- doReceive();
+ : Server(max_queue_size, port, std::move(logger)) {
}
+asio::awaitable<void> UdpServer::doReceive() {
+ asio::ip::udp::socket socket(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port_));
+ if (port_ == 0)
+ port_ = socket.local_endpoint().port();
+ while (true) {
+ std::string buffer = std::string(MAX_UDP_PACKET_SIZE, {});
+ asio::ip::udp::endpoint sender_endpoint;
-void UdpServer::doReceive() {
- buffer_.resize(MAX_UDP_PACKET_SIZE);
- socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
- sender_endpoint_,
- [this](std::error_code ec, std::size_t bytes_received) {
- if (!ec && bytes_received > 0) {
- buffer_.resize(bytes_received);
- if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
- concurrent_queue_.enqueue(utils::net::Message(std::move(buffer_), IpProtocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
- else
- logger_->log_warn("Queue is full. UDP message ignored.");
- }
- doReceive();
- });
+ auto [receive_error, bytes_received] = co_await socket.async_receive_from(asio::buffer(buffer, MAX_UDP_PACKET_SIZE), sender_endpoint, utils::net::use_nothrow_awaitable);
+ if (receive_error) {
+ logger_->log_warn("Error during receive: %s", receive_error.message());
+ continue;
+ }
+ buffer.resize(bytes_received);
+ if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+ concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port()));
+ else
+ logger_->log_warn("Queue is full. UDP message ignored.");
+ }
}
} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/Catch.h b/libminifi/test/Catch.h
index 925f9dc29..755496feb 100644
--- a/libminifi/test/Catch.h
+++ b/libminifi/test/Catch.h
@@ -20,15 +20,15 @@
#define CATCH_CONFIG_FAST_COMPILE
#include <optional>
#include <string>
+#include "spdlog/spdlog.h"
#include "catch.hpp"
-
namespace Catch {
template<typename T>
struct StringMaker<std::optional<T>> {
static std::string convert(const std::optional<T>& val) {
if (val) {
- return "std::optional(" + StringMaker<T>::convert(val.value()) + ")";
+ return fmt::format("std::optional({})", StringMaker<T>::convert(val.value()));
}
return "std::nullopt";
}
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
return "std::nullopt";
}
};
+
+template <>
+struct StringMaker<std::error_code> {
+ static std::string convert(const std::error_code& error_code) {
+ return fmt::format("std::error_code(category:{}, value:{}, message:{})", error_code.category().name(), error_code.value(), error_code.message());
+ }
+};
} // namespace Catch
+
+namespace org::apache::nifi::minifi::test {
+struct MatchesSuccess : Catch::MatcherBase<std::error_code> {
+ MatchesSuccess() = default;
+
+ bool match(const std::error_code& err) const override {
+ return err.value() == 0;
+ }
+
+ std::string describe() const override {
+ return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+ }
+};
+
+struct MatchesError : Catch::MatcherBase<std::error_code> {
+ explicit MatchesError(std::optional<std::error_code> expected_error = std::nullopt)
+ : Catch::MatcherBase<std::error_code>(),
+ expected_error_(expected_error) {
+ }
+
+ bool match(const std::error_code& err) const override {
+ if (expected_error_)
+ return err == *expected_error_;
+ return err.value() != 0;
+ }
+
+ std::string describe() const override {
+ if (expected_error_)
+ return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(*expected_error_));
+ return fmt::format("!= {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+ }
+ private:
+ std::optional<std::error_code> expected_error_;
+};
+} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index bb06b21b7..4f37b399e 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -19,11 +19,13 @@
#include <string>
#include <utility>
#include <vector>
+#include <memory>
#include "rapidjson/document.h"
#include "asio.hpp"
#include "asio/ssl.hpp"
#include "net/Ssl.h"
+#include "utils/IntegrationTestUtils.h"
using namespace std::chrono_literals;
@@ -111,36 +113,39 @@ bool countLogOccurrencesUntil(const std::string& pattern,
return false;
}
-bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
+std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) {
asio::io_context io_context;
asio::ip::tcp::socket socket(io_context);
- socket.connect(remote_endpoint);
std::error_code err;
+ socket.connect(remote_endpoint, err);
+ if (err)
+ return err;
for (auto& content : contents) {
std::string tcp_message(content);
tcp_message += '\n';
asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
- if (err) {
- return false;
- }
+ if (err)
+ return err;
}
- return true;
+ return std::error_code();
}
-bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
asio::io_context io_context;
asio::ip::udp::socket socket(io_context);
- socket.open(remote_endpoint.protocol());
std::error_code err;
+ socket.open(remote_endpoint.protocol(), err);
+ if (err)
+ return err;
socket.send_to(content, remote_endpoint, 0, err);
- return !err;
+ return err;
}
-bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) {
return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint);
}
-bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
+std::error_code sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
return sendUdpDatagram(asio::buffer(content), remote_endpoint);
}
@@ -166,11 +171,12 @@ struct FlowFileQueueTestAccessor {
FIELD_ACCESSOR(queue_);
};
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
- const asio::ip::tcp::endpoint& remote_endpoint,
- const std::filesystem::path& ca_cert_path,
- const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
- asio::ssl::context ctx(asio::ssl::context::sslv23);
+std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+ const asio::ip::tcp::endpoint& remote_endpoint,
+ const std::filesystem::path& ca_cert_path,
+ const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt,
+ asio::ssl::context::method method = asio::ssl::context::tlsv12_client) {
+ asio::ssl::context ctx(method);
ctx.load_verify_file(ca_cert_path.string());
if (ssl_data) {
ctx.set_verify_mode(asio::ssl::verify_peer);
@@ -183,33 +189,48 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
asio::error_code err;
socket.lowest_layer().connect(remote_endpoint, err);
if (err) {
- return false;
+ return err;
}
socket.handshake(asio::ssl::stream_base::client, err);
if (err) {
- return false;
+ return err;
}
for (auto& content : contents) {
std::string tcp_message(content);
tcp_message += '\n';
asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
if (err) {
- return false;
+ return err;
}
}
- return true;
+ return std::error_code();
}
#ifdef WIN32
inline std::error_code hide_file(const std::filesystem::path& file_name) {
- const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
- if (!success) {
- // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
- // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
- return { static_cast<int>(GetLastError()), std::system_category() };
- }
- return {};
+ const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+ if (!success) {
+ // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+ // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+ return { static_cast<int>(GetLastError()), std::system_category() };
}
+ return {};
+}
#endif /* WIN32 */
+template<typename T>
+concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
+ && requires(T x) {
+ {T::Port} -> std::convertible_to<core::Property>;
+ {x.getPort()} -> std::convertible_to<uint16_t>;
+ }; // NOLINT(readability/braces)
+
+template<NetworkingProcessor T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
+ REQUIRE(processor->setProperty(T::Port, "0"));
+ test_plan->scheduleProcessor(processor);
+ REQUIRE(minifi::utils::verifyEventHappenedInPollTime(250ms, [&processor] { return processor->getPort() != 0; }, 20ms));
+ return processor->getPort();
+}
+
} // namespace org::apache::nifi::minifi::test::utils