You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/18 14:32:08 UTC

[GitHub] [nifi-minifi-cpp] martinzink opened a new pull request, #1457: Utils net server coro

martinzink opened a new pull request, #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457

   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [x] If applicable, have you updated the LICENSE file?
   - [x] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085091264


##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -141,16 +120,16 @@ class PutTCPTestFixture {
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions();
     }
     return -1;
   }
 
   void closeActiveConnections() {
     for (auto& [port, server] : listeners_) {
-      if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
-        session_aware_listener->closeSessions();
+      if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+        session_aware_listener->cancelEverything();

Review Comment:
   you are right, I've renamed these and a couple other things in this file to better reflect how the refactor changed them. https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085102784


##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -194,31 +194,115 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data));
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data), MatchesSuccess());
     }
   }
 
   SECTION("Required certificate not provided") {
+    ssl_context_service->enable();
+    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    auto send_error = utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt");
+    CHECK(send_error);

Review Comment:
   I coudnt find any ways to readably include the expected error. (asio constructs these in a fairly complicated way, there is no enum or anything like that, that I could use)
   So I've left this with the MatchesError 
   https://github.com/apache/nifi-minifi-cpp/pull/1457/files#diff-cba941a459893c41b8743d9b8423acf61afa575e3252a61773f8cb7949dc6626R214



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085095534


##########
libminifi/test/Utils.h:
##########
@@ -183,33 +188,54 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
+    && requires(T x) {
+      {T::Port} -> std::convertible_to<core::Property>;
+      {x.getPort()} -> std::convertible_to<uint16_t>;
+    };  // NOLINT(readability/braces)
+
+template<NetworkingProcessor T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
+  REQUIRE(processor->setProperty(T::Port, "0"));
+  test_plan->scheduleProcessor(processor);
+  uint16_t port = processor->getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = processor->getPort();
+  }
+  REQUIRE(port != 0);
+  return port;

Review Comment:
   good idea, included this aswell in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-eb32155f3c36cff6b7fb634b81ae2288627fae11502496cee252b9493e45bb4aR232-R233



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066882952


##########
libminifi/test/Utils.h:
##########
@@ -183,33 +187,51 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<class T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {

Review Comment:
   It turns out xcode 14 improved the lackluster support for concepts so I was able to upgrade to CI and added constraints using concepts in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/b7da454f75f65e7abd1b3736fd7e583334cd8a73#



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048381787


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT

Review Comment:
   what was the linter error here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085089224


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +178,147 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
-
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;

Review Comment:
   Good idea, I've chagned it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-a62ddb0368e1813dff2a90fe7433e148bfc7b21b7d7ad9c8a066fd040d209ac9R237



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048455640


##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -134,60 +147,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+      check_no_error(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));

Review Comment:
   should we rename this to `CHECK_NO_ERROR`, to mimic "checker" macros?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085092236


##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -238,27 +220,22 @@ class PutTCPTestFixture {
   const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
   test::SingleProcessorTestController controller_{put_tcp_};
 
-  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-
   class Server {
    public:
     Server() = default;
 
-    void startTCPServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
-      server_thread_ = std::thread([this]() { listener_->run(); });
-    }
-
-    void startSSLServer(uint16_t port) {
+    uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> ssl_server_options) {
       gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
-                                                          port,
-                                                          core::logging::LoggerFactory<utils::net::Server>::getLogger(),
-                                                          createSslDataForServer(),
-                                                          utils::net::SslServer::ClientAuthOption::REQUIRED);
+      listener_ = std::make_unique<CancellableTcpServer>(std::nullopt, 0, core::logging::LoggerFactory<utils::net::Server>::getLogger(), std::move(ssl_server_options));
       server_thread_ = std::thread([this]() { listener_->run(); });
+      uint16_t port = listener_->getPort();
+      auto deadline = std::chrono::steady_clock::now() + 200ms;
+      while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+        std::this_thread::sleep_for(20ms);
+        port = listener_->getPort();
+      }
+      REQUIRE(port != 0);
+      return port;

Review Comment:
   good idea, I've included this (with a smaller check interval) in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R232-R233



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1086664225


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -130,369 +145,179 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
-        logger_->log_error("%s is not a SSL Context Service", context_name);
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service");
+      }
     } else {
-      logger_->log_error("Invalid controller service: %s", context_name);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name);
     }
   }
 
   delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
 
-  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
-    connections_.reset();
-  else
-    connections_.emplace();
-
   if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
     max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
   else
     max_size_of_socket_send_buffer_.reset();
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    asio::ssl::context* ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  asio::ssl::context* ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : endpoints) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate().string());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {

Review Comment:
   I think 4 space continuations would be better.
   ![image](https://user-images.githubusercontent.com/1170582/214580494-34ada21f-bb48-44a4-a9ba-b41ab0ecbb96.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067893936


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);

Review Comment:
   You are right, and the test didn't catch it because our libressl doesnt have TLSv1.3 yet.
   I updated it so when we update our SSL version it should work.
   https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/1a62da1c25a15ced0c664fe1128cdb48792ba331



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1073490689


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -130,17 +146,19 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(ssl_context_service);
+      } else {
         logger_->log_error("%s is not a SSL Context Service", context_name);
+      }

Review Comment:
   It might be better to throw and avoid running without TLS if the SSLContextService is specified but invalid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085083897


##########
extensions/standard-processors/processors/NetworkListenerProcessor.cpp:
##########
@@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex
   auto options = readServerOptions(context);
 
   std::string ssl_value;
+  std::optional<utils::net::SslServerOptions> ssl_options;
   if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
     auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
-    server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
-  } else {
-    server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
+    auto client_auth = utils::parseEnumProperty<utils::net::ClientAuthOption>(context, client_auth_property);
+    ssl_options.emplace(utils::net::SslServerOptions{std::move(*ssl_data), client_auth});

Review Comment:
   You are right, however this doesnt compile on clang without introducing a ctor to the struct. (in theory this should work on c++20, but clang doesnt seem to support it yet) Anyways, I've added this and the required ctor in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-ddc009fae67709576613f5bd31e984c01c5bd4e4238a864cb14ebffa9f9368d3R76
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085084336


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);

Review Comment:
   Good idea, I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-a62ddb0368e1813dff2a90fe7433e148bfc7b21b7d7ad9c8a066fd040d209ac9R118-R128



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085095088


##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,73 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));

Review Comment:
   Thats shouldnt be possible based on the documentation. if the async_read_until returns before it encountered the delimiter than the read_error will be set accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085096295


##########
libminifi/test/Catch.h:
##########
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(value:{}, message:{})", error_code.value(), error_code.message());

Review Comment:
   Good idea to included that aswell. Added this is in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-2443fe4ee36121fb137afe4324fd0b376b91b1acdff7aa9e5cd28862f619a447R47



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1061463628


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);

Review Comment:
   Can we restrict this to only allow TLS 1.2 or later? TLS 1.3 or later would be even better, but all SSL versions are hopelessly insecure, and TLS 1.0 and 1.1 are widely deprecated by now due to known attacks. Even TLS 1.2 is only considered secure with an appropriately restricted cipher suite.



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,147 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {

Review Comment:
   `resolved_query` is not a descriptive name. I suggest renaming to `endpoints`.



##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,76 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::listen() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. TCP message ignored.");
+    read_message.erase(0, bytes_read);
+  }
 }
 
-void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) {
+  co_return co_await readLoop(socket);  // NOLINT
 }
 
-TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-    : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
+namespace {
+asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
+  ssl_context.set_options(
+      asio::ssl::context::default_workarounds
+      | asio::ssl::context::no_sslv2
+      | asio::ssl::context::single_dh_use);

Review Comment:
   Please consider disabling more than just SSLv2. SSLv3 is also very insecure, but even TLS 1.0 and 1.1 are widely deprecated by now.



##########
libminifi/test/Utils.h:
##########
@@ -166,10 +170,10 @@ struct FlowFileQueueTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
 
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
-                        const asio::ip::tcp::endpoint& remote_endpoint,
-                        const std::filesystem::path& ca_cert_path,
-                        const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
+std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+                                   const asio::ip::tcp::endpoint& remote_endpoint,
+                                   const std::filesystem::path& ca_cert_path,
+                                   const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {

Review Comment:
   I'm not asking for a change, but this perfectly demonstrates why I don't like aligned continuations.



##########
libminifi/src/utils/net/UdpServer.cpp:
##########
@@ -15,32 +15,39 @@
  * limitations under the License.
  */
 #include "utils/net/UdpServer.h"
+#include "asio/use_awaitable.hpp"
+#include "asio/detached.hpp"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+
 UdpServer::UdpServer(std::optional<size_t> max_queue_size,
                      uint16_t port,
                      std::shared_ptr<core::logging::Logger> logger)
-    : Server(max_queue_size, std::move(logger)),
-      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
-  doReceive();
+    : Server(max_queue_size, port, std::move(logger)) {
 }
 
+asio::awaitable<void> UdpServer::listen() {

Review Comment:
   This is doing much more than just "listen" (which is something that doesn't even happen with UDP). I also don't see the point of adding the messages to an internal queue over just returning (or co_yield?) them.



##########
libminifi/test/Utils.h:
##########
@@ -183,33 +187,51 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<class T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {

Review Comment:
   Could you add some static checks for the template argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067157183


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);

Review Comment:
   It looks like TLS 1.2 only. TLS 1.3 should be preferred.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066880102


##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -134,60 +147,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+      check_no_error(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));

Review Comment:
   I've sidestepped the issue by creating a MatchesSuccess Matcher and use that. https://github.com/apache/nifi-minifi-cpp/commit/fb2cba6cb1b8eab262e1d17b5d67d69ed0e1e9a3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1061448842


##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));

Review Comment:
   It's only to verify expression language support IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1049623267


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                     const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);

Review Comment:
   good idea :+1: https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/70d6a798090869c4c01b2dd7dc84eeaf9dcef2a3#diff-a62ddb0368e1813dff2a90fe7433e148bfc7b21b7d7ad9c8a066fd040d209ac9R307-R308



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -134,60 +147,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+      check_no_error(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));

Review Comment:
   sure thing, I've also moved it to test utils.h in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/70d6a798090869c4c01b2dd7dc84eeaf9dcef2a3#diff-eb32155f3c36cff6b7fb634b81ae2288627fae11502496cee252b9493e45bb4aR221-R223



##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -29,49 +29,67 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10256;
+void check_no_error(std::error_code error_code) {
+  CHECK_FALSE(error_code);
+}
 
-void check_for_attributes(core::FlowFile& flow_file) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
 }
 
+uint16_t scheduleProcessorOnRandomPort(SingleProcessorTestController& controller, const std::shared_ptr<ListenUDP>& listen_udp) {

Review Comment:
   :+1: makes sense, I've extracted to test utils in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/70d6a798090869c4c01b2dd7dc84eeaf9dcef2a3#diff-eb32155f3c36cff6b7fb634b81ae2288627fae11502496cee252b9493e45bb4aR207-R224



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -141,16 +120,16 @@ class PutTCPTestFixture {
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions();  // There is always one inactive session waiting for a new connection

Review Comment:
   You are right, this comment makes no sense anymore (different method to count the sessions), I've removed the stale comment https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/70d6a798090869c4c01b2dd7dc84eeaf9dcef2a3#diff-bbb56b112b398a6450c23e71e5dbbfb17bf20dad3d2fb908a1c6de26118f9ee8R124



##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));

Review Comment:
   On one hand this verifies that the property supports expression language, on the other hand it was intruduced in https://github.com/apache/nifi-minifi-cpp/commit/feb5a1b183dc82dd83c6ce53c8c20ba819c27863#diff-f690439939269c49549e72231c05edf2e8ba912ec3fe206031d6465b7d161779R83 so for further explanation we might ask @szaszm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048441787


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT

Review Comment:
   clang tidy gave false positive warnings (it seems like it doesn't handle coroutine codes very well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048472526


##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));

Review Comment:
   why do we wrap the Port in `${literal('...')}`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048458529


##########
extensions/standard-processors/tests/unit/ListenUDPTests.cpp:
##########
@@ -29,49 +29,67 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-constexpr uint64_t PORT = 10256;
+void check_no_error(std::error_code error_code) {
+  CHECK_FALSE(error_code);
+}
 
-void check_for_attributes(core::FlowFile& flow_file) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
-  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
 }
 
+uint16_t scheduleProcessorOnRandomPort(SingleProcessorTestController& controller, const std::shared_ptr<ListenUDP>& listen_udp) {

Review Comment:
   there seem to be three implementations of this functions, differing in which kind or processor they schedule, could a single template function achieve the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067003083


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                     const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);

Review Comment:
   `io::isError` seems to be used instead of direct comparison with the error value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066878540


##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,76 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::listen() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));
+    else
+      logger_->log_warn("Queue is full. TCP message ignored.");
+    read_message.erase(0, bytes_read);
+  }
 }
 
-void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. TCP message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) {
+  co_return co_await readLoop(socket);  // NOLINT
 }
 
-TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
-    : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
+namespace {
+asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
+  ssl_context.set_options(
+      asio::ssl::context::default_workarounds
+      | asio::ssl::context::no_sslv2
+      | asio::ssl::context::single_dh_use);

Review Comment:
   Sure, I've restricted to only allow TLS 1.2 or newer versions in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/653894f83c9464edda7d5238408937074f51c34c#diff-924bd95b72a2543c8a20ebd77132a2f64d563f851a318cd515091ee2fdf927b1R61 and also added a unit test that verifies this behavior 



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);

Review Comment:
   Sure, I've restricted to only allow TLS 1.2 or newer versions in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/653894f83c9464edda7d5238408937074f51c34c#diff-a62ddb0368e1813dff2a90fe7433e148bfc7b21b7d7ad9c8a066fd040d209ac9R120



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067173950


##########
libminifi/test/Utils.h:
##########
@@ -183,33 +188,57 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept DerivedFromProcessor = std::derived_from<T, minifi::core::Processor>;  // NOLINT(readability/braces)
+
+template<typename T>
+concept HasPortProperty = requires(T x) { {T::Port} -> std::convertible_to<core::Property>; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept HasGetPortFn = requires(T x) { {x.getPort()} -> std::convertible_to<uint16_t>; };  // NOLINT(readability/braces)
+
+template<class T>
+requires DerivedFromProcessor<T> && HasPortProperty<T> && HasGetPortFn<T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {

Review Comment:
   I wouldn't extract DerivedFromProcessor, since it's not much shorter and saying the same thing. The other two are fine as implementation details, but I would hide them in a detail namespace.
   
   One useful concept could be the combination of these, as a way to describe networking processors.
   ```suggestion
   // NOLINTBEGIN(readability/braces): clang-tidy doesn't play well with concepts
   template<typename T>
   concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
       && requires(T x) {
         {T::Port} -> std::convertible_to<core::Property>;
         {x.getPort()} -> std::convertible_to<uint16_t>;
       };
   // NOLINTEND(readability/braces)
   
   template<NetworkingProcessor T>
   uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm closed pull request #1457: MINIFICPP-1979 Use Coroutines with asio
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085691450


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -130,17 +146,19 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(ssl_context_service);
+      } else {
         logger_->log_error("%s is not a SSL Context Service", context_name);
+      }

Review Comment:
   Makes sense, I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/0e86eb92dda9d7b65b77246d545be2d8348d878d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1061447296


##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -134,60 +147,64 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
   SECTION("Without client certificate verification") {
     SECTION("Client certificate not required, Client Auth set to NONE by default") {
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
     SECTION("Client certificate not required, but validated if provided") {
       REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT"));
+      ssl_context_service->enable();
+      port = scheduleProcessorOnRandomPort(controller, listen_tcp);
       SECTION("sending through IPv4", "[IPv4]") {
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
       }
       SECTION("sending through IPv6", "[IPv6]") {
         if (utils::isIPv6Disabled())
           return;
-        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+        endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
       }
     }
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+      check_no_error(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));

Review Comment:
   I'd prefer to keep it lowercase, since it's not a macro.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048384528


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                     const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);

Review Comment:
   should we handle read error here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1048466865


##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -141,16 +120,16 @@ class PutTCPTestFixture {
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions();  // There is always one inactive session waiting for a new connection

Review Comment:
   is the comment still relevant, why can we remove the `-1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066879820


##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -47,23 +47,26 @@ std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer&
 
 TEST_CASE("PutUDP", "[putudp]") {
   const auto put_udp = std::make_shared<PutUDP>("PutUDP");
-  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
 
   test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
+  utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
+  uint16_t port = listener.getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listener.getPort();
+  }
   auto cleanup_server = gsl::finally([&]{
     listener.stop();
     server_thread.join();
   });
+  put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));

Review Comment:
   I've sidestepped the issue by creating a MatchesSuccess Matcher and use that. https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/fb2cba6cb1b8eab262e1d17b5d67d69ed0e1e9a3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066881433


##########
libminifi/test/Utils.h:
##########
@@ -166,10 +170,10 @@ struct FlowFileQueueTestAccessor {
   FIELD_ACCESSOR(queue_);
 };
 
-bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
-                        const asio::ip::tcp::endpoint& remote_endpoint,
-                        const std::filesystem::path& ca_cert_path,
-                        const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {
+std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents,
+                                   const asio::ip::tcp::endpoint& remote_endpoint,
+                                   const std::filesystem::path& ca_cert_path,
+                                   const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) {

Review Comment:
   I see your point, I squeezed in this change as well in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/653894f83c9464edda7d5238408937074f51c34c#diff-eb32155f3c36cff6b7fb634b81ae2288627fae11502496cee252b9493e45bb4aR174-R178



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067159155


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);

Review Comment:
   https://stackoverflow.com/a/47097088



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067899983


##########
libminifi/test/Utils.h:
##########
@@ -183,33 +188,57 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept DerivedFromProcessor = std::derived_from<T, minifi::core::Processor>;  // NOLINT(readability/braces)
+
+template<typename T>
+concept HasPortProperty = requires(T x) { {T::Port} -> std::convertible_to<core::Property>; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept HasGetPortFn = requires(T x) { {x.getPort()} -> std::convertible_to<uint16_t>; };  // NOLINT(readability/braces)
+
+template<class T>
+requires DerivedFromProcessor<T> && HasPortProperty<T> && HasGetPortFn<T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {

Review Comment:
   Good idea, I've added your suggestion (slightly changed it because the NOLINTBEGIN NOLINTEND doesnt seemed to work, and also its not clang tidy but our cpplint.py that doesnt play well with concepts.
   I found the problem in the linter script so ill will create a PR for it in https://github.com/cpplint/cpplint/blob/develop/cpplint.py and/or update our version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066880579


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,147 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {

Review Comment:
   done :+1:  https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/1a101d8494864d5a0e3c99ba0f985bb26fc08bcb



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1066877738


##########
libminifi/src/utils/net/UdpServer.cpp:
##########
@@ -15,32 +15,39 @@
  * limitations under the License.
  */
 #include "utils/net/UdpServer.h"
+#include "asio/use_awaitable.hpp"
+#include "asio/detached.hpp"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
+constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+
 UdpServer::UdpServer(std::optional<size_t> max_queue_size,
                      uint16_t port,
                      std::shared_ptr<core::logging::Logger> logger)
-    : Server(max_queue_size, std::move(logger)),
-      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) {
-  doReceive();
+    : Server(max_queue_size, port, std::move(logger)) {
 }
 
+asio::awaitable<void> UdpServer::listen() {

Review Comment:
   You are right, I reverted back to doReceive() in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/ba31387396eace85fafa762252f174abf0adf04a, and I agree this UDP part could be much simpler (relying on the OS udp queue), but I feel like refactoring that is out of the scope of this PR. I created a jira issue to investigate this further. https://issues.apache.org/jira/browse/MINIFICPP-2031



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067901128


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +177,145 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& resolved_query, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : resolved_query) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {
+  if (auto connection_error = co_await setupUsableSocket(io_context))  // NOLINT
+    co_return connection_error;
+  co_return co_await send(stream_to_send, delimiter);
 }
 
 template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                                                        const std::vector<std::byte>& delimiter) {
-  if (!socket || !socket->lowest_layer().is_open())
-    return nonstd::make_unexpected(asio::error::not_socket);
-
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.restart();
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                     const std::vector<std::byte>& delimiter) {
+  gsl_Expects(hasUsableSocket());
 
   std::vector<std::byte> data_chunk;
   data_chunk.resize(chunk_size);
-
   gsl::span<std::byte> buffer{data_chunk};
-  size_t num_read = flow_file_content_stream->read(buffer);
-  logger_->log_trace("read %zu bytes from flowfile", num_read);
-  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-  });
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  last_used_ = std::chrono::steady_clock::now();
-  return {};
-}
+  while (stream_to_send->tell() < stream_to_send->size()) {
+    size_t num_read = stream_to_send->read(buffer);

Review Comment:
   I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/a76b49d2389cc2c51e1eb3aad513f9e638b44301



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067152515


##########
extensions/standard-processors/processors/NetworkListenerProcessor.cpp:
##########
@@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex
   auto options = readServerOptions(context);
 
   std::string ssl_value;
+  std::optional<utils::net::SslServerOptions> ssl_options;
   if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
     auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
-    server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
-  } else {
-    server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
+    auto client_auth = utils::parseEnumProperty<utils::net::ClientAuthOption>(context, client_auth_property);
+    ssl_options.emplace(utils::net::SslServerOptions{std::move(*ssl_data), client_auth});

Review Comment:
   minor, but this constructs `SslServerOptions` first, then moves it; you can get rid of the move by changing it to
   ```suggestion
       ssl_options.emplace(std::move(*ssl_data), client_auth);
   ```



##########
extensions/standard-processors/tests/unit/ListenSyslogTests.cpp:
##########
@@ -249,132 +248,155 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
+uint16_t schedule_on_random_port(SingleProcessorTestController& controller, const std::shared_ptr<ListenSyslog>& listen_syslog) {
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, "0"));
+  controller.plan->scheduleProcessor(listen_syslog);
+  uint16_t port = listen_syslog->getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = listen_syslog->getPort();
+  }
+  REQUIRE(port != 0);
+  return port;
+}

Review Comment:
   can we use `utils::scheduleProcessorOnRandomPort()` instead of this?



##########
libminifi/test/Utils.h:
##########
@@ -183,33 +188,54 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents,
   asio::error_code err;
   socket.lowest_layer().connect(remote_endpoint, err);
   if (err) {
-    return false;
+    return err;
   }
   socket.handshake(asio::ssl::stream_base::client, err);
   if (err) {
-    return false;
+    return err;
   }
   for (auto& content : contents) {
     std::string tcp_message(content);
     tcp_message += '\n';
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err) {
-      return false;
+      return err;
     }
   }
-  return true;
+  return std::error_code();
 }
 
 #ifdef WIN32
 inline std::error_code hide_file(const std::filesystem::path& file_name) {
-    const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
-    if (!success) {
-      // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
-      // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
-      return { static_cast<int>(GetLastError()), std::system_category() };
-    }
-    return {};
+  const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN);
+  if (!success) {
+    // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
+    // The below casting is safe in [0;std::numeric_limits<int>::max()], int max is guaranteed to be at least 32767
+    return { static_cast<int>(GetLastError()), std::system_category() };
   }
+  return {};
+}
 #endif /* WIN32 */
 
+template<typename T>
+concept NetworkingProcessor = std::derived_from<T, minifi::core::Processor>
+    && requires(T x) {
+      {T::Port} -> std::convertible_to<core::Property>;
+      {x.getPort()} -> std::convertible_to<uint16_t>;
+    };  // NOLINT(readability/braces)
+
+template<NetworkingProcessor T>
+uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr<TestPlan>& test_plan, const std::shared_ptr<T>& processor) {
+  REQUIRE(processor->setProperty(T::Port, "0"));
+  test_plan->scheduleProcessor(processor);
+  uint16_t port = processor->getPort();
+  auto deadline = std::chrono::steady_clock::now() + 200ms;
+  while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+    std::this_thread::sleep_for(20ms);
+    port = processor->getPort();
+  }
+  REQUIRE(port != 0);
+  return port;

Review Comment:
   this could be rewritten to use `verifyEventHappenedInPollTime`, too



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +178,147 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    std::optional<asio::ssl::context>& ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
-
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  std::optional<asio::ssl::context>& ssl_context_;

Review Comment:
   Why is this a non-const reference-to-optional?  That is a bit of a mind-bending type.  Could it be a bare pointer to `asio::ssl::context`?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -238,27 +220,22 @@ class PutTCPTestFixture {
   const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
   test::SingleProcessorTestController controller_{put_tcp_};
 
-  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
-  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
-
   class Server {
    public:
     Server() = default;
 
-    void startTCPServer(uint16_t port) {
-      gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
-      server_thread_ = std::thread([this]() { listener_->run(); });
-    }
-
-    void startSSLServer(uint16_t port) {
+    uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> ssl_server_options) {
       gsl_Expects(!listener_ && !server_thread_.joinable());
-      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
-                                                          port,
-                                                          core::logging::LoggerFactory<utils::net::Server>::getLogger(),
-                                                          createSslDataForServer(),
-                                                          utils::net::SslServer::ClientAuthOption::REQUIRED);
+      listener_ = std::make_unique<CancellableTcpServer>(std::nullopt, 0, core::logging::LoggerFactory<utils::net::Server>::getLogger(), std::move(ssl_server_options));
       server_thread_ = std::thread([this]() { listener_->run(); });
+      uint16_t port = listener_->getPort();
+      auto deadline = std::chrono::steady_clock::now() + 200ms;
+      while (port == 0 && deadline > std::chrono::steady_clock::now()) {
+        std::this_thread::sleep_for(20ms);
+        port = listener_->getPort();
+      }
+      REQUIRE(port != 0);
+      return port;

Review Comment:
   I think
   ```suggestion
         REQUIRE(utils::verifyEventHappenedInPollTime(200ms, [this] { return listener_->getPort() != 0; }));
         return listener_->getPort();
   ```
   would be nicer



##########
extensions/standard-processors/tests/unit/ListenSyslogTests.cpp:
##########
@@ -480,41 +504,44 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
 }
 
 TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
-  asio::ip::tcp::endpoint endpoint;
-  SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
-  }
-  SECTION("sending through IPv6", "[IPv6]") {
-    if (utils::isIPv6Disabled())
-      return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
-  }
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
-
   SingleProcessorTestController controller{listen_syslog};
+
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  ssl_context_service->enable();
+
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
-  ssl_context_service->enable();
-  controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+
+  auto port = schedule_on_random_port(controller, listen_syslog);
+
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
+  }
+
+  CHECK_THAT(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, (executable_dir / "resources" / "ca_A.crt").string()), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, (executable_dir / "resources" / "ca_A.crt").string()), MatchesSuccess());

Review Comment:
   the parameter type is `path`, so the `.string()`s are not needed



##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,73 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+  asio::ip::tcp::acceptor acceptor(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().local_endpoint().port()));

Review Comment:
   Is it possible that `read_message[bytes_read - 1]` is not a `\n` (eg. if we got to the end of the stream)?  In that case, throwing away this character would not be the right thing to do.



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -114,6 +114,21 @@ void PutTCP::initialize() {
 
 void PutTCP::notifyStop() {}
 
+namespace {
+asio::ssl::context getSslContext(const std::shared_ptr<controllers::SSLContextService>& ssl_context_service) {
+  gsl_Expects(ssl_context_service);

Review Comment:
   we could change the parameter type to `const SSLContextService&`, then this assertion would not be needed (since we already check for nullness at the calling site)



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -194,31 +194,115 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data));
+      CHECK_THAT(utils::sendMessagesViaSSL({message}, endpoint, executable_dir / "resources" / "ca_A.crt", ssl_data), MatchesSuccess());
     }
   }
 
   SECTION("Required certificate not provided") {
+    ssl_context_service->enable();
+    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
+    port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_tcp);
     SECTION("sending through IPv4", "[IPv4]") {
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
     }
     SECTION("sending through IPv6", "[IPv6]") {
       if (utils::isIPv6Disabled())
         return;
-      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT);
+      endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
     }
-    REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED"));
-    ssl_context_service->enable();
-    controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+    auto send_error = utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt");
+    CHECK(send_error);

Review Comment:
   could this be changed to `CHECK_THAT(send_error, MatchesError())` (possibly with a specific error code)?



##########
libminifi/test/Catch.h:
##########
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(value:{}, message:{})", error_code.value(), error_code.message());
+  }
+};
 }  // namespace Catch
+
+namespace org::apache::nifi::minifi::test {
+struct MatchesSuccess : Catch::MatcherBase<std::error_code> {
+  MatchesSuccess() = default;
+
+  bool match(const std::error_code& err) const override {
+    return err.value() == 0;
+  }
+
+  std::string describe() const override {
+    return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+};
+
+struct MatchesError : Catch::MatcherBase<std::error_code> {
+  explicit MatchesError(std::optional<std::error_code> expected_error = std::nullopt)
+      : Catch::MatcherBase<std::error_code>(),
+        expected_error_(expected_error) {
+  }
+
+  bool match(const std::error_code& err) const override {
+    if (expected_error_)
+      return err.value() == expected_error_->value();

Review Comment:
   I would do
   ```suggestion
         return err == *expected_error_;
   ```
   here, so the category gets compared, too



##########
libminifi/test/Catch.h:
##########
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(value:{}, message:{})", error_code.value(), error_code.message());

Review Comment:
   it could be useful to include the category, as well, as in the `ostream <<` operator: https://en.cppreference.com/w/cpp/error/error_code/operator_ltlt



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -141,16 +120,16 @@ class PutTCPTestFixture {
   }
 
   size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
-    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
-      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions();
     }
     return -1;
   }
 
   void closeActiveConnections() {
     for (auto& [port, server] : listeners_) {
-      if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
-        session_aware_listener->closeSessions();
+      if (auto session_aware_listener = dynamic_cast<CancellableTcpServer*>(getListener(port))) {
+        session_aware_listener->cancelEverything();

Review Comment:
   nitpicking, but "session_aware_" doesn't make much sense now; I would rename these `server` or `listener` or something like that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085084978


##########
extensions/standard-processors/tests/unit/ListenSyslogTests.cpp:
##########
@@ -480,41 +504,44 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw
 }
 
 TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
-  asio::ip::tcp::endpoint endpoint;
-  SECTION("sending through IPv4", "[IPv4]") {
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
-  }
-  SECTION("sending through IPv6", "[IPv6]") {
-    if (utils::isIPv6Disabled())
-      return;
-    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT);
-  }
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
-
   SingleProcessorTestController controller{listen_syslog};
+
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "localhost_by_A.pem").string()));
+  ssl_context_service->enable();
+
   LogTestController::getInstance().setTrace<ListenSyslog>();
-  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
-  ssl_context_service->enable();
-  controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, executable_dir / "resources" / "ca_A.crt"));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, executable_dir / "resources" / "ca_A.crt"));
+
+  auto port = schedule_on_random_port(controller, listen_syslog);
+
+  asio::ip::tcp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), port);
+  }
+
+  CHECK_THAT(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, (executable_dir / "resources" / "ca_A.crt").string()), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, (executable_dir / "resources" / "ca_A.crt").string()), MatchesSuccess());

Review Comment:
   good catch, I missed these during rebase. Fixed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-0a83894df6805a91d9989588f4b4191d89c6a9e7d0de0ef7535e24382281f866R522-R523



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085089692


##########
libminifi/test/Catch.h:
##########
@@ -40,4 +40,46 @@ struct StringMaker<std::nullopt_t> {
     return "std::nullopt";
   }
 };
+
+template <>
+struct StringMaker<std::error_code> {
+  static std::string convert(const std::error_code& error_code) {
+    return fmt::format("std::error_code(value:{}, message:{})", error_code.value(), error_code.message());
+  }
+};
 }  // namespace Catch
+
+namespace org::apache::nifi::minifi::test {
+struct MatchesSuccess : Catch::MatcherBase<std::error_code> {
+  MatchesSuccess() = default;
+
+  bool match(const std::error_code& err) const override {
+    return err.value() == 0;
+  }
+
+  std::string describe() const override {
+    return fmt::format("== {}", Catch::StringMaker<std::error_code>::convert(std::error_code{}));
+  }
+};
+
+struct MatchesError : Catch::MatcherBase<std::error_code> {
+  explicit MatchesError(std::optional<std::error_code> expected_error = std::nullopt)
+      : Catch::MatcherBase<std::error_code>(),
+        expected_error_(expected_error) {
+  }
+
+  bool match(const std::error_code& err) const override {
+    if (expected_error_)
+      return err.value() == expected_error_->value();

Review Comment:
   Good idea, :+1: changed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-2443fe4ee36121fb137afe4324fd0b376b91b1acdff7aa9e5cd28862f619a447R73



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1093236625


##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -130,369 +145,179 @@ void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessio
     idle_connection_expiration_.reset();
 
   if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
-    timeout_ = timeout->getMilliseconds();
+    timeout_duration_ = timeout->getMilliseconds();
   else
-    timeout_ = 15s;
+    timeout_duration_ = 15s;
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
 
   std::string context_name;
-  ssl_context_service_.reset();
+  ssl_context_.reset();
   if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
     if (auto controller_service = context->getControllerService(context_name)) {
-      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
-      if (!ssl_context_service_)
-        logger_->log_error("%s is not a SSL Context Service", context_name);
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) {
+        ssl_context_ = getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service");
+      }
     } else {
-      logger_->log_error("Invalid controller service: %s", context_name);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name);
     }
   }
 
   delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
 
-  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
-    connections_.reset();
-  else
-    connections_.emplace();
-
   if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
     max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
   else
     max_size_of_socket_send_buffer_.reset();
 }
 
 namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&, asio::steady_timer::duration) {
+  co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration timeout_duration) {
+  co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
+}
+
 template<class SocketType>
 class ConnectionHandler : public ConnectionHandlerBase {
  public:
   ConnectionHandler(detail::ConnectionId connection_id,
                     std::chrono::milliseconds timeout,
                     std::shared_ptr<core::logging::Logger> logger,
                     std::optional<size_t> max_size_of_socket_send_buffer,
-                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+                    asio::ssl::context* ssl_context)
       : connection_id_(std::move(connection_id)),
-        timeout_(timeout),
+        timeout_duration_(timeout),
         logger_(std::move(logger)),
         max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
-        ssl_context_service_(std::move(ssl_context_service)) {
+        ssl_context_(ssl_context) {
   }
 
   ~ConnectionHandler() override = default;
 
-  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+  asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, asio::io_context& io_context_) override;
 
  private:
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
   [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
-    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+    return last_used_ && *last_used_ >= (steady_clock::now() - dur);
   }
 
   void reset() override {
     last_used_.reset();
     socket_.reset();
-    io_context_.reset();
-    last_error_.clear();
-    deadline_.expires_at(asio::steady_timer::time_point::max());
   }
 
-  void checkDeadline(std::error_code error_code, SocketType* socket);
-  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
-
-  void handleConnect(std::error_code error,
-                     tcp::resolver::results_type::iterator endpoint_iter,
-                     const std::shared_ptr<SocketType>& socket);
-  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                               const std::shared_ptr<SocketType>& socket);
-  void handleHandshake(std::error_code error,
-                       const tcp::resolver::results_type::iterator& endpoint_iter,
-                       const std::shared_ptr<SocketType>& socket);
-
-  void handleWrite(std::error_code error,
-                   std::size_t bytes_written,
-                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                   const std::vector<std::byte>& delimiter,
-                   const std::shared_ptr<SocketType>& socket);
-
-  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context);
+  [[nodiscard]] bool hasUsableSocket() const {  return socket_ && socket_->lowest_layer().is_open(); }
 
-  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+  asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+  asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter);
 
-  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+  SocketType createNewSocket(asio::io_context& io_context_);
 
   detail::ConnectionId connection_id_;
-  std::optional<std::chrono::steady_clock::time_point> last_used_;
-  asio::io_context io_context_;
-  std::error_code last_error_;
-  asio::steady_timer deadline_{io_context_};
-  std::chrono::milliseconds timeout_;
-  std::shared_ptr<SocketType> socket_;
+  std::optional<SocketType> socket_;
+
+  std::optional<steady_clock::time_point> last_used_;
+  std::chrono::milliseconds timeout_duration_;
 
   std::shared_ptr<core::logging::Logger> logger_;
   std::optional<size_t> max_size_of_socket_send_buffer_;
 
-  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
-  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
-  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
-                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                           const std::vector<std::byte>& delimiter);
+  asio::ssl::context* ssl_context_;
 };
 
-template<class SocketType>
-nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
-  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
-}
-
-template<class SocketType>
-nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
-  if (socket_ && socket_->lowest_layer().is_open())
-    return socket_;
-  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
-  if (!new_socket)
-    return nonstd::make_unexpected(new_socket.error());
-  socket_ = std::move(*new_socket);
-  return socket_;
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
-  if (error_code != asio::error::operation_aborted) {
-    deadline_.expires_at(asio::steady_timer::time_point::max());
-    last_error_ = asio::error::timed_out;
-    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
-    socket->lowest_layer().close();
-  }
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
-  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
-    logger_->log_trace("No more endpoints to try");
-    deadline_.cancel();
-    return;
-  }
-
-  last_error_.clear();
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
-      [&socket, endpoint_iter, this](std::error_code err) {
-        handleConnect(err, endpoint_iter, socket);
-      });
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
-                                                  tcp::resolver::results_type::iterator endpoint_iter,
-                                                  const std::shared_ptr<SocketType>& socket) {
-  bool connection_failed_before_deadline = error.operator bool();
-  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (connection_failed_due_to_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (connection_failed_before_deadline) {
-    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
-    last_error_ = error;
-    socket->lowest_layer().close();
-    return startConnect(++endpoint_iter, socket);
-  }
-
-  if (max_size_of_socket_send_buffer_)
-    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
-
-  handleConnectionSuccess(endpoint_iter, socket);
-}
-
-template<class SocketType>
-void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
-                                                    const tcp::resolver::results_type::iterator&,
-                                                    const std::shared_ptr<SocketType>&) {
-  throw std::invalid_argument("Handshake called without SSL");
-}
-
 template<>
-void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
-                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                   const std::shared_ptr<SslSocket>& socket) {
-  if (!error) {
-    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
-    deadline_.cancel();
-    return;
-  }
-  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
-  last_error_ = error;
-  socket->lowest_layer().close();
-  startConnect(std::next(endpoint_iter), socket);
+TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(!ssl_context_);
+  return TcpSocket{io_context_};
 }
 
 template<>
-void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<TcpSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->lowest_layer().non_blocking(true);
-  deadline_.cancel();
-}
-
-template<>
-void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
-                                                           const std::shared_ptr<SslSocket>& socket) {
-  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
-  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
-    handleHandshake(handshake_error, endpoint_iter, socket);
-  });
+SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) {
+  gsl_Expects(ssl_context_);
+  return {io_context_, *ssl_context_};
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
-                                                std::size_t bytes_written,
-                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
-                                                const std::vector<std::byte>& delimiter,
-                                                const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing flowfile to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
-  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
-    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
-      handleDelimiterWrite(error, bytes_written, socket);
-    });
-  } else {
-    std::vector<std::byte> data_chunk;
-    data_chunk.resize(chunk_size);
-    gsl::span<std::byte> buffer{data_chunk};
-    size_t num_read = flow_file_content_stream->read(buffer);
-    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
-      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
-    });
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) {
+  auto socket = createNewSocket(io_context);
+  std::error_code last_error;
+  for (const auto& endpoint : endpoints) {
+    auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_);
+    if (connection_error) {
+      core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message();
+      last_error = connection_error;
+      continue;
+    }
+    auto [handshake_error] = co_await handshake(socket, timeout_duration_);
+    if (handshake_error) {
+      core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message();
+      last_error = handshake_error;
+      continue;
+    }
+    if (max_size_of_socket_send_buffer_)
+      socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+    socket_.emplace(std::move(socket));
+    co_return std::error_code();
   }
+  co_return last_error;
 }
 
 template<class SocketType>
-void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
-  bool write_failed_before_deadline = error.operator bool();
-  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
-
-  if (write_failed_due_to_deadline) {
-    logger_->log_trace("Writing delimiter to socket timed out");
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  if (write_failed_before_deadline) {
-    last_error_ = error;
-    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
-    socket->lowest_layer().close();
-    deadline_.cancel();
-    return;
-  }
-
-  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
-  deadline_.cancel();
-}
-
-
-template<>
-nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto socket = std::make_shared<TcpSocket>(io_context_);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.expires_after(timeout_);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
-}
-
-asio::ssl::context getSslContext(const auto& ssl_context_service) {
-  gsl_Expects(ssl_context_service);
-  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
-  ssl_context.load_verify_file(ssl_context_service->getCACertificate().string());
-  ssl_context.set_verify_mode(asio::ssl::verify_peer);
-  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
-    ssl_context.use_certificate_file(cert_file.string(), asio::ssl::context::pem);
-  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
-    ssl_context.use_private_key_file(private_key_file.string(), asio::ssl::context::pem);
-  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
-  return ssl_context;
+[[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) {
+  if (hasUsableSocket())
+    co_return std::error_code();
+  tcp::resolver resolver(io_context);
+  auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout(resolver.async_resolve(connection_id_.getHostname(), connection_id_.getPort(), use_nothrow_awaitable), timeout_duration_);
+  if (resolve_error)
+    co_return resolve_error;
+  co_return co_await establishNewConnection(resolve_result, io_context);
 }
 
-template<>
-nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
-  auto ssl_context = getSslContext(ssl_context_service_);
-  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
-  startConnect(resolved_query.begin(), socket);
-  deadline_.async_wait([&](std::error_code error_code) -> void {
-    checkDeadline(error_code, socket.get());
-  });
-  io_context_.run();
-  if (last_error_)
-    return nonstd::make_unexpected(last_error_);
-  return socket;
+template<class SocketType>
+asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send,
+                                                                                        const std::vector<std::byte>& delimiter,
+                                                                                        asio::io_context& io_context) {

Review Comment:
   👍 fixed it in https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/d6b898da263217c928e6290eb77ae09e6f36030a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org