You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "lordgamez (via GitHub)" <gi...@apache.org> on 2023/06/26 10:28:33 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

lordgamez opened a new pull request, #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595

   https://issues.apache.org/jira/browse/MINIFICPP-2137
   
   Note: depends on https://github.com/apache/nifi-minifi-cpp/pull/1583 as there was a compile issue with LibreSSL on Windows that does not appear with OpenSSL. It seems to be due to some Windows specific macro definition. If we plan to move to OpenSSL I don't think it's worth debugging.
   
   ```
   2023-06-23T15:41:12.8423242Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(194): error C2143: syntax error: missing ')' before 'constant'
   2023-06-23T15:41:12.8558554Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(194): error C2143: syntax error: missing ';' before 'constant'
   2023-06-23T15:41:12.8582850Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(194): error C2059: syntax error: ')'
   2023-06-23T15:41:12.8727617Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(194): error C2238: unexpected token(s) preceding ';'
   2023-06-23T15:41:12.8746940Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(202): error C2143: syntax error: missing ')' before 'constant'
   2023-06-23T15:41:12.8752091Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(202): error C2143: syntax error: missing ';' before 'constant'
   2023-06-23T15:41:12.8754426Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(202): error C2059: syntax error: ')'
   2023-06-23T15:41:12.8876950Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(202): error C2238: unexpected token(s) preceding ';'
   2023-06-23T15:41:12.9189256Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(231): error C2143: syntax error: missing ')' before 'constant'
   2023-06-23T15:41:12.9220842Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(231): error C2143: syntax error: missing ';' before 'constant'
   2023-06-23T15:41:12.9268061Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(231): error C2059: syntax error: ')'
   2023-06-23T15:41:12.9314784Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(231): error C2238: unexpected token(s) preceding ';'
   2023-06-23T15:41:12.9378837Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(657): error C2059: syntax error: '('
   2023-06-23T15:41:12.9710560Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(657): error C2143: syntax error: missing ',' before '*'
   2023-06-23T15:41:12.9786697Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(657): error C2143: syntax error: missing ';' before ')'
   2023-06-23T15:41:12.9789902Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(657): error C2059: syntax error: ')'
   2023-06-23T15:41:12.9918377Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(837): error C2065: 'nm': undeclared identifier
   2023-06-23T15:41:12.9962902Z D:\a\nifi-minifi-cpp\b\thirdparty\libressl-install\include\openssl/x509v3.h(837): error C2226: syntax error: unexpected type 'stack_st_CONF_VALUE'
   ```
   
   ----------------------
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1271906190


##########
controller/MiNiFiController.cpp:
##########
@@ -104,19 +104,18 @@ int main(int argc, char **argv) {
   log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
   minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
 
-  std::shared_ptr<minifi::controllers::SSLContextService> secure_context;
+  minifi::controller::ControllerSocketData socket_data;
   try {
-    secure_context = getSSLContextService(configuration);
+    socket_data.ssl_context_service = getSSLContextService(configuration);
   } catch(const minifi::Exception& ex) {
     logger->log_error(ex.what());
     exit(1);
   }
   auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
 
-  std::string host = "localhost";
+
   std::string port_str;
   std::string ca_cert;

Review Comment:
   Updated in 45c677990eeb0cb223814b3c55d37e84711a9b03



##########
controller/MiNiFiController.cpp:
##########
@@ -171,84 +168,72 @@ int main(int argc, char **argv) {
     if (result.count("stop") > 0) {
       auto& components = result["stop"].as<std::vector<std::string>>();
       for (const auto& component : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
-        if (minifi::controller::stopComponent(std::move(socket), component))
+        if (minifi::controller::stopComponent(socket_data, component))
           std::cout << component << " requested to stop" << std::endl;
         else
-          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
       }
     }
 
     if (result.count("start") > 0) {
       auto& components = result["start"].as<std::vector<std::string>>();
       for (const auto& component : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
-        if (minifi::controller::startComponent(std::move(socket), component))
+        if (minifi::controller::startComponent(socket_data, component))
           std::cout << component << " requested to start" << std::endl;
         else
-          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
       }
     }
 
     if (result.count("c") > 0) {
       auto& components = result["c"].as<std::vector<std::string>>();
       for (const auto& connection : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
-                                                : stream_factory_->createSocket(host, port);
-        if (minifi::controller::clearConnection(std::move(socket), connection)) {
+        if (minifi::controller::clearConnection(socket_data, connection)) {
           std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl;
-          socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
-                                             : stream_factory_->createSocket(host, port);
-          if (minifi::controller::getConnectionSize(std::move(socket), std::cout, connection) < 0)
-            std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          if (!minifi::controller::getConnectionSize(socket_data, std::cout, connection))
+            std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;

Review Comment:
   Good point, I think it was supposed to give information of the current connection size before clearing it, but it is done after the command is sent and processed so it will always show 0. I think it's unnecessary so I removed it in 45c677990eeb0cb223814b3c55d37e84711a9b03



##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */

Review Comment:
   Removed comment in 45c677990eeb0cb223814b3c55d37e84711a9b03



##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  size_t read(std::span<std::byte> buf) override;
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */

Review Comment:
   Updated parameter names in 45c677990eeb0cb223814b3c55d37e84711a9b03



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1283170700


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -95,40 +151,29 @@ void ControllerSocketProtocol::initialize() {
   configuration_->get(Configuration::controller_socket_host, host);
 
   std::string port;
+  stopListener();
   if (configuration_->get(Configuration::controller_socket_port, port)) {
-    if (nullptr != secure_context) {
-#ifdef OPENSSL_SUPPORT
-      // if there is no openssl support we won't be using SSL
-      auto tls_context = std::make_shared<io::TLSContext>(configuration_, secure_context);
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::TLSServerSocket(tls_context, host, std::stoi(port), 2));
-#else
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
-#endif
-    } else {
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
-    }
-    // if we have a localhost hostname and we did not manually specify any.interface we will
-    // bind only to the loopback adapter
+    // if we have a localhost hostname and we did not manually specify any.interface we will bind only to the loopback adapter
     if ((host == "localhost" || host == "127.0.0.1" || host == "::") && !any_interface) {
-      server_socket_->initialize(true);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), std::stoi(port)));
     } else {
-      server_socket_->initialize(false);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), std::stoi(port)));
     }
 
-    auto check = [this]() -> bool {
-      return update_sink_.isRunning();
-    };
-
-    auto handler = [this](io::BaseStream *stream) {
-      handleCommand(stream);
-    };
-    server_socket_->registerCallback(check, handler);
+    if (secure_context) {
+      co_spawn(io_context_, startAcceptSsl(std::move(secure_context)), asio::detached);
+    } else {
+      co_spawn(io_context_, startAccept(), asio::detached);
+    }
+    server_thread_ = std::thread([this] {
+      io_context_.run();
+    });

Review Comment:
   Even though both the C2 agent and the controller socket uses C2 protocol for communication they are configured and run as separate entities both of which can be disabled separately from each other, so I don't think that's a good idea to share an io_context between them.



##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -325,33 +370,33 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) {
   }
 }
 
-void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
+asio::awaitable<void> ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& stream) {

Review Comment:
   We are spawning a separate coroutine for each handled command, so we need to have the `handleCommand` to be a coroutine to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1251010460


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -86,32 +87,37 @@ void ControllerSocketProtocol::stopListener() {
   io_context_.restart();
 }
 
-void ControllerSocketProtocol::startAccept() {
-  acceptor_->async_accept([this](const asio::error_code& error, asio::ip::tcp::socket socket) {
-    if (!error) {
-      io::AsioStream<asio::ip::tcp::socket> stream(std::move(socket));
-      handleCommand(stream);
+asio::awaitable<void> ControllerSocketProtocol::startAccept() {
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      co_return;

Review Comment:
   Good point, updated in 3c4c32b943e927e77d2c4f9d49a52d5405b47f75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1270866502


##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */

Review Comment:
   There is no `buflen` parameter.  I would either remove the whole comment, or keep the first line only.



##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  size_t read(std::span<std::byte> buf) override;
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */

Review Comment:
   This comment is misleading; I would remove it and also change the name of the first parameter, since it points to the start of a buffer, not to a single value.
   
   The parameter names could e.g. be `source_buffer` here, and `target_buffer` in `read()`.



##########
controller/MiNiFiController.cpp:
##########
@@ -171,84 +168,72 @@ int main(int argc, char **argv) {
     if (result.count("stop") > 0) {
       auto& components = result["stop"].as<std::vector<std::string>>();
       for (const auto& component : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
-        if (minifi::controller::stopComponent(std::move(socket), component))
+        if (minifi::controller::stopComponent(socket_data, component))
           std::cout << component << " requested to stop" << std::endl;
         else
-          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
       }
     }
 
     if (result.count("start") > 0) {
       auto& components = result["start"].as<std::vector<std::string>>();
       for (const auto& component : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
-        if (minifi::controller::startComponent(std::move(socket), component))
+        if (minifi::controller::startComponent(socket_data, component))
           std::cout << component << " requested to start" << std::endl;
         else
-          std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
       }
     }
 
     if (result.count("c") > 0) {
       auto& components = result["c"].as<std::vector<std::string>>();
       for (const auto& connection : components) {
-        auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
-                                                : stream_factory_->createSocket(host, port);
-        if (minifi::controller::clearConnection(std::move(socket), connection)) {
+        if (minifi::controller::clearConnection(socket_data, connection)) {
           std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl;
-          socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context)
-                                             : stream_factory_->createSocket(host, port);
-          if (minifi::controller::getConnectionSize(std::move(socket), std::cout, connection) < 0)
-            std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+          if (!minifi::controller::getConnectionSize(socket_data, std::cout, connection))
+            std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;

Review Comment:
   I can see this was here before this PR, too, but do you understand why?  All other commands do one thing, only `clear` does both a `clearConnection` and a `getConnectionSize`.



##########
controller/MiNiFiController.cpp:
##########
@@ -104,19 +104,18 @@ int main(int argc, char **argv) {
   log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
   minifi::core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
 
-  std::shared_ptr<minifi::controllers::SSLContextService> secure_context;
+  minifi::controller::ControllerSocketData socket_data;
   try {
-    secure_context = getSSLContextService(configuration);
+    socket_data.ssl_context_service = getSSLContextService(configuration);
   } catch(const minifi::Exception& ex) {
     logger->log_error(ex.what());
     exit(1);
   }
   auto stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
 
-  std::string host = "localhost";
+
   std::string port_str;
   std::string ca_cert;

Review Comment:
   `ca_cert` is unused, and `port_str` could be moved closer to where it is used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1283171762


##########
libminifi/src/io/InputStream.cpp:
##########
@@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) {
     return length_return;
   }
 
-  std::vector<std::byte> buffer(string_length);
-  const auto read_return = read(buffer);
-  if (read_return != string_length) {
-    return read_return;
+  str.clear();
+  str.reserve(string_length);
+
+  auto bytes_to_read = string_length;
+  while (bytes_to_read > 0) {
+    std::vector<std::byte> buffer(bytes_to_read);
+    const auto read_return = read(buffer);
+    if (io::isError(read_return))
+      return read_return;
+    bytes_to_read -= read_return;
+    str.append(std::string(reinterpret_cast<const char*>(buffer.data()), read_return));

Review Comment:
   Added retry counter in 0fef6eb9ae735800ca069c868f1e9eca4930d10c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280707520


##########
libminifi/src/io/InputStream.cpp:
##########
@@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) {
     return length_return;
   }
 
-  std::vector<std::byte> buffer(string_length);
-  const auto read_return = read(buffer);
-  if (read_return != string_length) {
-    return read_return;
+  str.clear();
+  str.reserve(string_length);
+
+  auto bytes_to_read = string_length;
+  while (bytes_to_read > 0) {
+    std::vector<std::byte> buffer(bytes_to_read);
+    const auto read_return = read(buffer);
+    if (io::isError(read_return))
+      return read_return;
+    bytes_to_read -= read_return;
+    str.append(std::string(reinterpret_cast<const char*>(buffer.data()), read_return));

Review Comment:
   The problem with the read was that in case of the `AsioStream` there were 2 possible read operations we could use to rewrite the C sockets' read operation: `asio::read` and `asio::basic_stream_socket::read_some`. The problem with `asio::read` was discovered with `CSite2SiteTests` where `asio::read` was blocking until it did not receive the number of bytes available in the buffer (1000 in that scenario) when only less bytes were available in the read operation:
   
   "When calling an overload that does not accept a CompletionCondition, it is the equivalent to calling its associated overload with a CompletionCondition of boost::asio::transfer_all(), causing the operation to read streambuf.max_size() bytes."
   
   So instead `asio::basic_stream_socket::read_some` was used which is said to be equivalent to the `::read()` on Linux. The problem with this function was that it returned after reading the number of bytes that fit in its buffer, so it failed on larger inputs, due to this code in the original `InputStream::read(std::string &str, bool widen)`:
   
   ```
   const auto read_return = read(buffer);
   if (read_return != string_length) {
     return read_return;
   ```
   
   This is why the `InputStream::read` was changed as we cannot change the asio's read function.
   
   ------------
   
   You are right, I think we should return if the returned read is 0 with a STREAM_ERROR. Updated in c8d0aed1f0eb559c1edc2beda019f4a11228ae26



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1251853900


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -66,7 +70,54 @@ ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerS
         configuration_(std::move(configuration)),
         socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
-  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+}
+
+ControllerSocketProtocol::~ControllerSocketProtocol() {
+  stopListener();
+}
+
+void ControllerSocketProtocol::stopListener() {
+  io_context_.stop();
+  if (acceptor_) {
+    acceptor_->close();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  io_context_.restart();
+}
+
+asio::awaitable<void> ControllerSocketProtocol::startAccept() {
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      continue;
+    }
+    auto stream = std::make_unique<io::AsioStream<asio::ip::tcp::socket>>(std::move(socket));
+    co_spawn(io_context_, handleCommand(std::move(stream)), asio::detached);
+  }
+}
+
+asio::awaitable<void> ControllerSocketProtocol::startAcceptSsl(std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
+  while (true) {  // NOLINT(clang-analyzer-core.NullDereference) suppressing asio library linter warning
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      continue;
+    }
+    asio::ssl::context ssl_context = utils::net::getSslContext(*ssl_context_service, asio::ssl::context::tls_server);
+    asio::ssl::stream<asio::ip::tcp::socket> ssl_socket(std::move(socket), ssl_context);
+
+    auto [handshake_error] = co_await ssl_socket.async_handshake(utils::net::HandshakeType::server, utils::net::use_nothrow_awaitable);

Review Comment:
   I think you're right, I moved it to a separate co_spawned executor with the `handleCommand` in b0301bcc074899dc342ca39188d7f76fca4d65dd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1281752297


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -325,33 +370,33 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) {
   }
 }
 
-void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
+asio::awaitable<void> ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& stream) {

Review Comment:
   what is the purpose of `&&`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1281752297


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -325,33 +370,33 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) {
   }
 }
 
-void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
+asio::awaitable<void> ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& stream) {

Review Comment:
   what is the purpose of `$$`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280578871


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -95,40 +151,29 @@ void ControllerSocketProtocol::initialize() {
   configuration_->get(Configuration::controller_socket_host, host);
 
   std::string port;
+  stopListener();
   if (configuration_->get(Configuration::controller_socket_port, port)) {
-    if (nullptr != secure_context) {
-#ifdef OPENSSL_SUPPORT
-      // if there is no openssl support we won't be using SSL
-      auto tls_context = std::make_shared<io::TLSContext>(configuration_, secure_context);
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::TLSServerSocket(tls_context, host, std::stoi(port), 2));
-#else
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
-#endif
-    } else {
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2));
-    }
-    // if we have a localhost hostname and we did not manually specify any.interface we will
-    // bind only to the loopback adapter
+    // if we have a localhost hostname and we did not manually specify any.interface we will bind only to the loopback adapter
     if ((host == "localhost" || host == "127.0.0.1" || host == "::") && !any_interface) {
-      server_socket_->initialize(true);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), std::stoi(port)));
     } else {
-      server_socket_->initialize(false);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), std::stoi(port)));
     }
 
-    auto check = [this]() -> bool {
-      return update_sink_.isRunning();
-    };
-
-    auto handler = [this](io::BaseStream *stream) {
-      handleCommand(stream);
-    };
-    server_socket_->registerCallback(check, handler);
+    if (secure_context) {
+      co_spawn(io_context_, startAcceptSsl(std::move(secure_context)), asio::detached);
+    } else {
+      co_spawn(io_context_, startAccept(), asio::detached);
+    }
+    server_thread_ = std::thread([this] {
+      io_context_.run();
+    });

Review Comment:
   Can we share the io_context with c2?



##########
libminifi/src/io/InputStream.cpp:
##########
@@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) {
     return length_return;
   }
 
-  std::vector<std::byte> buffer(string_length);
-  const auto read_return = read(buffer);
-  if (read_return != string_length) {
-    return read_return;
+  str.clear();
+  str.reserve(string_length);
+
+  auto bytes_to_read = string_length;
+  while (bytes_to_read > 0) {
+    std::vector<std::byte> buffer(bytes_to_read);
+    const auto read_return = read(buffer);
+    if (io::isError(read_return))
+      return read_return;
+    bytes_to_read -= read_return;
+    str.append(std::string(reinterpret_cast<const char*>(buffer.data()), read_return));

Review Comment:
   We could retry a couple of times on read_return=0 before bailing. Not sure if it changes anything in practice.



##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -325,33 +370,33 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) {
   }
 }
 
-void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
+asio::awaitable<void> ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& stream) {

Review Comment:
   What do we get by making this an equivalent coroutine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1277150303


##########
libminifi/src/utils/net/AsioSocketUtils.cpp:
##########
@@ -30,9 +30,9 @@ asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::
   co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
 }
 
-asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
-  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
-  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method) {
+  asio::ssl::context ssl_context(ssl_context_method);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);

Review Comment:
   I don't think so, I tried to be consistent with the same options used in the `TcpServer`, but I'm not sure why only these are disabled, maybe @martinzink could elaborate on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1271985655


##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  size_t read(std::span<std::byte> buf) override;
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */

Review Comment:
   Yes, my mistake, fixed in 055547275fb03b502d81d1f505c49656eb250bc0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280658231


##########
libminifi/src/utils/net/AsioSocketUtils.cpp:
##########
@@ -30,9 +30,9 @@ asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::
   co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
 }
 
-asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
-  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
-  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method) {
+  asio::ssl::context ssl_context(ssl_context_method);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);

Review Comment:
   It does disable them at least in the case of `TcpServer` we confirm it during [Test ListenTCP SSL/TLS compatibility](https://github.com/apache/nifi-minifi-cpp/blob/main/extensions/standard-processors/tests/unit/ListenTcpTests.cpp#L240), but I see no reason why we dont explicitly disable sslv2 and sslv3 with the appropriate options. So Im in favour of adding them (here and everywhere else).
   
   Also since OpenSSL 3.0 supports it we should try to add tlsv1_3 support (in a new PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm closed pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1247524864


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -66,7 +69,49 @@ ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerS
         configuration_(std::move(configuration)),
         socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
-  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+}
+
+ControllerSocketProtocol::~ControllerSocketProtocol() {
+  stopListener();
+}
+
+void ControllerSocketProtocol::stopListener() {
+  io_context_.stop();
+  if (acceptor_) {
+    acceptor_->close();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  io_context_.restart();
+}
+
+void ControllerSocketProtocol::startAccept() {
+  acceptor_->async_accept([this](const asio::error_code& error, asio::ip::tcp::socket socket) {

Review Comment:
   I think you should consider using the coroutines instead.
   That would remove the need for callbacks and as far as I know the performance is much better aswell.
   
   It should be fairly easy to rewrite these using coro. There is a great tutorial by the author of asio.
   https://www.youtube.com/watch?v=icgnqFM-aY4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1247524864


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -66,7 +69,49 @@ ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerS
         configuration_(std::move(configuration)),
         socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
-  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+}
+
+ControllerSocketProtocol::~ControllerSocketProtocol() {
+  stopListener();
+}
+
+void ControllerSocketProtocol::stopListener() {
+  io_context_.stop();
+  if (acceptor_) {
+    acceptor_->close();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  io_context_.restart();
+}
+
+void ControllerSocketProtocol::startAccept() {
+  acceptor_->async_accept([this](const asio::error_code& error, asio::ip::tcp::socket socket) {

Review Comment:
   I think you should consider using the coroutines instead.
   That would remove the need for callbacks and as far as I know the performance is much better aswell.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1250445155


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -66,7 +69,49 @@ ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerS
         configuration_(std::move(configuration)),
         socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
-  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+}
+
+ControllerSocketProtocol::~ControllerSocketProtocol() {
+  stopListener();
+}
+
+void ControllerSocketProtocol::stopListener() {
+  io_context_.stop();
+  if (acceptor_) {
+    acceptor_->close();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  io_context_.restart();
+}
+
+void ControllerSocketProtocol::startAccept() {
+  acceptor_->async_accept([this](const asio::error_code& error, asio::ip::tcp::socket socket) {

Review Comment:
   Thanks! Rewritten in da7844db28b0ee3a782ca2c2d2e1ffe4dfbebc73



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280688745


##########
libminifi/src/utils/net/AsioSocketUtils.cpp:
##########
@@ -30,9 +30,9 @@ asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::
   co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
 }
 
-asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
-  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
-  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method) {
+  asio::ssl::context ssl_context(ssl_context_method);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);

Review Comment:
   Did a little bit more research and it turns out that the method tlsv12_server which we are passing in the ctor is not the maximum or minimum or recommended supported version but rather than the exact exclusive protocol to be used. 
   
   (Back when asio first was introduced to our codebase we only wanted to enable tlsv12 because we didnt have tlsv13 so it worked, and the test_case was skipped) 
   
   Moreover during the OpenSSL 3.0 refactor the test was modified so the expected support for TLSv3 was changed. (prior to this PR this test_case was skipped due to missing tlsv13 implementation) https://github.com/apache/nifi-minifi-cpp/commit/1f93c33b68203bee44a29e02951ed01ffc78bdda#diff-cba941a459893c41b8743d9b8423acf61afa575e3252a61773f8cb7949dc6626L290
   
   To enable both tls 1.2 and 1.3 we would need something like this
   
   ```
     asio::ssl::context ssl_context(asio::ssl::context::tls_server);
     auto tls_v12_or_v13 = asio::ssl::context::no_sslv2 | asio::ssl::context::no_sslv3 | asio::ssl::context::no_tlsv1_1;
     ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | tls_v12_or_v13);
   
   ```
   
   This solves the explicit unsupported error from asio but still fails with https://github.com/apache/nifi-minifi-cpp/blob/main/extensions/standard-processors/tests/unit/ListenTcpTests.cpp#L300 during the TLSv1.3 tests (1.2 works as intended and the previous versions fails as expected)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1279657260


##########
libminifi/src/io/InputStream.cpp:
##########
@@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) {
     return length_return;
   }
 
-  std::vector<std::byte> buffer(string_length);
-  const auto read_return = read(buffer);
-  if (read_return != string_length) {
-    return read_return;
+  str.clear();
+  str.reserve(string_length);
+
+  auto bytes_to_read = string_length;
+  while (bytes_to_read > 0) {
+    std::vector<std::byte> buffer(bytes_to_read);
+    const auto read_return = read(buffer);
+    if (io::isError(read_return))
+      return read_return;
+    bytes_to_read -= read_return;
+    str.append(std::string(reinterpret_cast<const char*>(buffer.data()), read_return));

Review Comment:
   What made this loop necessary? Aren't partial reads handled inside each stream implementation? We should probably fix the problematic stream class instead.
   
   By the way, what if the read syscall keeps returning 0, and it's directly returned by the `InputStream::read` implementation of the stream class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280692000


##########
libminifi/src/utils/net/AsioSocketUtils.cpp:
##########
@@ -30,9 +30,9 @@ asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::
   co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
 }
 
-asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
-  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
-  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method) {
+  asio::ssl::context ssl_context(ssl_context_method);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);

Review Comment:
   If we dont care about the tls_v13 support just yet, we could keep the explicit tlsv12 in the ctor and even remove the asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1 part from the options.
   I wouldnt mind something similar to the [Test ListenTCP SSL/TLS compatibility](https://github.com/apache/nifi-minifi-cpp/blob/main/extensions/standard-processors/tests/unit/ListenTcpTests.cpp#L240) that checks this function aswell. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1250977536


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -86,32 +87,37 @@ void ControllerSocketProtocol::stopListener() {
   io_context_.restart();
 }
 
-void ControllerSocketProtocol::startAccept() {
-  acceptor_->async_accept([this](const asio::error_code& error, asio::ip::tcp::socket socket) {
-    if (!error) {
-      io::AsioStream<asio::ip::tcp::socket> stream(std::move(socket));
-      handleCommand(stream);
+asio::awaitable<void> ControllerSocketProtocol::startAccept() {
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      co_return;

Review Comment:
   I think this should be a continue instead of co_return similar to how errors were handled before the coro rewrite (if we encounter an error we dont open a socket but carry on).
   We have this problem in TCPServer aswell. I've fixed that one in https://github.com/apache/nifi-minifi-cpp/pull/1592/files#diff-924bd95b72a2543c8a20ebd77132a2f64d563f851a318cd515091ee2fdf927b1R33-R34



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1251801428


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -66,7 +70,54 @@ ControllerSocketProtocol::ControllerSocketProtocol(core::controller::ControllerS
         configuration_(std::move(configuration)),
         socket_restart_processor_(update_sink_) {
   gsl_Expects(configuration_);
-  stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+}
+
+ControllerSocketProtocol::~ControllerSocketProtocol() {
+  stopListener();
+}
+
+void ControllerSocketProtocol::stopListener() {
+  io_context_.stop();
+  if (acceptor_) {
+    acceptor_->close();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  io_context_.restart();
+}
+
+asio::awaitable<void> ControllerSocketProtocol::startAccept() {
+  while (true) {
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      continue;
+    }
+    auto stream = std::make_unique<io::AsioStream<asio::ip::tcp::socket>>(std::move(socket));
+    co_spawn(io_context_, handleCommand(std::move(stream)), asio::detached);
+  }
+}
+
+asio::awaitable<void> ControllerSocketProtocol::startAcceptSsl(std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
+  while (true) {  // NOLINT(clang-analyzer-core.NullDereference) suppressing asio library linter warning
+    auto [accept_error, socket] = co_await acceptor_->async_accept(utils::net::use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Controller socket accept failed with the following message: '%s'", accept_error.message());
+      continue;
+    }
+    asio::ssl::context ssl_context = utils::net::getSslContext(*ssl_context_service, asio::ssl::context::tls_server);
+    asio::ssl::stream<asio::ip::tcp::socket> ssl_socket(std::move(socket), ssl_context);
+
+    auto [handshake_error] = co_await ssl_socket.async_handshake(utils::net::HandshakeType::server, utils::net::use_nothrow_awaitable);

Review Comment:
   I think it might be better to do the handshake inside the co_spawned executor, because if the handshake hangs or just slow we are still ready to accept the next connection. (Similar to how the handshake was inside the callback before the coro rewrite)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1247496446


##########
controller/Controller.cpp:
##########
@@ -20,195 +20,286 @@
 
 #include "io/BufferStream.h"
 #include "c2/C2Payload.h"
+#include "io/AsioStream.h"
+#include "asio/ssl/context.hpp"
+#include "asio/ssl/stream.hpp"
+#include "asio/connect.hpp"
+#include "core/logging/Logger.h"
+#include "utils/net/AsioSocketUtils.h"
 
 namespace org::apache::nifi::minifi::controller {
 
-bool sendSingleCommand(std::unique_ptr<io::Socket> socket, uint8_t op, const std::string& value) {
-  if (socket->initialize() < 0) {
+namespace {
+
+class ClientConnection {
+ public:
+  explicit ClientConnection(const ControllerSocketData& socket_data) {
+    if (socket_data.ssl_context_service) {
+      connectTcpSocketOverSsl(socket_data);
+    } else {
+      connectTcpSocket(socket_data);
+    }
+  }
+
+  [[nodiscard]] io::BaseStream* getStream() const {
+    return stream_.get();
+  }
+
+ private:
+  void connectTcpSocketOverSsl(const ControllerSocketData& socket_data) {
+    auto ssl_context = utils::net::getSslContext(*socket_data.ssl_context_service);
+    asio::ssl::stream<asio::ip::tcp::socket> socket(io_context_, ssl_context);
+
+    asio::ip::tcp::resolver resolver(io_context_);
+    asio::error_code err;
+    asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port), err);
+    if (err) {
+      logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+
+    asio::connect(socket.lowest_layer(), endpoints, err);
+    if (err) {
+      logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+    socket.handshake(asio::ssl::stream_base::client, err);
+    if (err) {
+      logger_->log_error("SSL handshake failed while connecting to host '%s' on port '%s' with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+    stream_ = std::make_unique<io::AsioStream<asio::ssl::stream<asio::ip::tcp::socket>>>(std::move(socket));
+  }
+
+  void connectTcpSocket(const ControllerSocketData& socket_data) {
+    asio::ip::tcp::socket socket(io_context_);
+
+    asio::ip::tcp::resolver resolver(io_context_);
+    asio::error_code err;
+    asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port));

Review Comment:
   I think it might be better to use the async variant, since this might take a long time before it fails, and there is no way to set timeout for the synchronous variant.
   
   The resolve part is also sync in PutUDP we should reconsider that one aswell



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1247522551


##########
controller/Controller.cpp:
##########
@@ -20,195 +20,286 @@
 
 #include "io/BufferStream.h"
 #include "c2/C2Payload.h"
+#include "io/AsioStream.h"
+#include "asio/ssl/context.hpp"
+#include "asio/ssl/stream.hpp"
+#include "asio/connect.hpp"
+#include "core/logging/Logger.h"
+#include "utils/net/AsioSocketUtils.h"
 
 namespace org::apache::nifi::minifi::controller {
 
-bool sendSingleCommand(std::unique_ptr<io::Socket> socket, uint8_t op, const std::string& value) {
-  if (socket->initialize() < 0) {
+namespace {
+
+class ClientConnection {
+ public:
+  explicit ClientConnection(const ControllerSocketData& socket_data) {
+    if (socket_data.ssl_context_service) {
+      connectTcpSocketOverSsl(socket_data);
+    } else {
+      connectTcpSocket(socket_data);
+    }
+  }
+
+  [[nodiscard]] io::BaseStream* getStream() const {
+    return stream_.get();
+  }
+
+ private:
+  void connectTcpSocketOverSsl(const ControllerSocketData& socket_data) {
+    auto ssl_context = utils::net::getSslContext(*socket_data.ssl_context_service);
+    asio::ssl::stream<asio::ip::tcp::socket> socket(io_context_, ssl_context);
+
+    asio::ip::tcp::resolver resolver(io_context_);
+    asio::error_code err;
+    asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port), err);
+    if (err) {
+      logger_->log_error("Resolving host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+
+    asio::connect(socket.lowest_layer(), endpoints, err);
+    if (err) {
+      logger_->log_error("Connecting to host '%s' on port '%s' failed with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+    socket.handshake(asio::ssl::stream_base::client, err);
+    if (err) {
+      logger_->log_error("SSL handshake failed while connecting to host '%s' on port '%s' with the following message: '%s'", socket_data.host, std::to_string(socket_data.port), err.message());
+      return;
+    }
+    stream_ = std::make_unique<io::AsioStream<asio::ssl::stream<asio::ip::tcp::socket>>>(std::move(socket));
+  }
+
+  void connectTcpSocket(const ControllerSocketData& socket_data) {
+    asio::ip::tcp::socket socket(io_context_);
+
+    asio::ip::tcp::resolver resolver(io_context_);
+    asio::error_code err;
+    asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(socket_data.host, std::to_string(socket_data.port));

Review Comment:
   Looking a bit closer, we didnt have timeout previously, so Im fine with the sync call here :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1273669111


##########
libminifi/src/utils/net/AsioSocketUtils.cpp:
##########
@@ -30,9 +30,9 @@ asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::
   co_return co_await asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client, use_nothrow_awaitable), timeout_duration);  // NOLINT
 }
 
-asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service) {
-  asio::ssl::context ssl_context(asio::ssl::context::tls_client);
-  ssl_context.set_options(asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method) {
+  asio::ssl::context ssl_context(ssl_context_method);
+  ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);

Review Comment:
   Does this also disable SSLv2 and SSLv3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1595: MINIFICPP-2137 Rewrite MiNiFi Controller to use asio

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1271974609


##########
libminifi/include/io/AsioStream.h:
##########
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "BaseStream.h"
+#include "core/logging/LoggerFactory.h"
+#include "asio/ts/internet.hpp"
+#include "asio/read.hpp"
+#include "asio/write.hpp"
+#include "io/validation.h"
+
+namespace org::apache::nifi::minifi::io {
+
+template<typename AsioSocketStreamType>
+class AsioStream : public io::BaseStream {
+ public:
+  explicit AsioStream(AsioSocketStreamType&& stream) : stream_(std::move(stream)) {}
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  size_t read(std::span<std::byte> buf) override;
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */

Review Comment:
   It's the other way round: `read()` reads from the stream into the target buffer, and `write()` writes the contents of the source buffer to the stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org