You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/25 15:43:12 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1419: MINIFICPP-1934 PutTCP processor

fgerlits commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1004155881


##########
PROCESSORS.md:
##########
@@ -2167,6 +2168,32 @@ In the list below, the names of required properties appear in bold. Any other pr
 | success | After a successful SQL update operation, the incoming FlowFile sent here |
 
 
+## PutTCP
+
+### Description
+The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is closed after the FlowFile has been sent.
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                           | Default Value | Allowable Values | Description                                                                                                                                                                                                                                                                                                                                                                                                        |
+|--------------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Hostname**                   | localhost     |                  | The ip address or hostname of the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                                          |
+| **Port**                       |               |                  | The port or service on the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                                                 |
+| **Idle Connection Expiration** | 15 seconds    |                  | The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                   |
+| **Timeout**                    | 15 seconds    |                  | The timeout for connecting to and communicating with the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                   |
+| **Connection Per FlowFile**    | false         |                  | Specifies whether to send each FlowFile's content on an individual connection.                                                                                                                                                                                                                                                                                                                                     |
+| Outgoing Message Delimiter     |               |                  | Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.<br/>**Supports Expression Language: true** |
+| SSL Context Service            |               |                  | The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.                                                                                                                                                                                                                                                                          |
+| Max Size of Socket Send Buffer |               |                  | The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.                                                                                                                                                                                                                                                      |
+
+### Properties
+| Name    | Description                                                                |
+|---------|----------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this relationship. |
+| failure | FlowFiles that encountered IO errors are send out this relationship.       |

Review Comment:
   tiny typo:
   ```suggestion
   | failure | FlowFiles that encountered IO errors are sent out this relationship.       |
   ```



##########
libminifi/include/utils/StringUtils.h:
##########
@@ -37,6 +37,19 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#include <compare>
+#endif
+
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+template<typename _CharT, typename _Traits, typename _Alloc>
+constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, _Traits, _Alloc>& __lhs,
+    const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept {
+  return __lhs.compare(__rhs) <=> 0;
+}
+#endif

Review Comment:
   I would change the version check to `< 16000` because it looks like this is still not fixed in clang 15: https://godbolt.org/z/79W6jcEbe



##########
extensions/standard-processors/processors/PutTCP.h:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+    std::size_t operator () (const ConnectionId& connection_id) const {
+      return utils::hash_combine(std::hash<std::string>{}(connection_id.hostname_), std::hash<std::string>{}(connection_id.port_));
+    }
+  };
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  std::string& getHostname() { return hostname_; }
+  std::string& getPort() { return port_; }

Review Comment:
   could these return `const std::string&`?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());

Review Comment:
   typo:
   ```suggestion
       logger_->log_trace("SessionAwareSslServer::createSession %p", session.get());
   ```



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms)
+    idle_connection_expiration_ = idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name))
+    ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });
+  }
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public IConnectionHandler {
+ public:
+  ConnectionHandler(ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
+    if (!last_used_)
+      return false;
+    return *last_used_ >= (std::chrono::steady_clock::now() - dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(const std::error_code& error_code, const std::shared_ptr<SocketType>& socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(const std::error_code& error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(const std::error_code& error,
+                       const tcp::resolver::results_type::iterator& endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(const std::error_code& error,
+                   std::size_t bytes_written,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(const std::error_code& error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+
+  ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket, const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, data, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(const std::error_code& error_code, const std::shared_ptr<SocketType>& socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](const std::error_code& error_code) { checkDeadline(error_code, socket); });

Review Comment:
   why is this new `async_wait` needed?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources/localhost_by_A.pem").string();

Review Comment:
   it would be better to use the / operator in both places



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>

Review Comment:
   this should be `SessionType` instead of `SocketType`, as it can either be `TcpSession` or `SslSession`



##########
extensions/standard-processors/processors/PutTCP.h:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+    std::size_t operator () (const ConnectionId& connection_id) const {
+      return utils::hash_combine(std::hash<std::string>{}(connection_id.hostname_), std::hash<std::string>{}(connection_id.port_));
+    }
+  };
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  std::string& getHostname() { return hostname_; }
+  std::string& getPort() { return port_; }
+
+ private:
+  std::string hostname_;
+  std::string port_;
+};
+
+class IConnectionHandler {
+ public:
+  virtual ~IConnectionHandler() = default;
+
+  [[nodiscard]] virtual bool hasBeenUsed() const = 0;
+  [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) const = 0;
+  virtual nonstd::expected<void, std::error_code> sendData(const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) = 0;
+  virtual void reset() = 0;
+};
+
+class PutTCP final : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+      "By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, "
+      "an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. "
+      "An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection "
+      "which is closed after the FlowFile has been sent.";
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property IdleConnectionExpiration;
+  EXTENSIONAPI static const core::Property Timeout;
+  EXTENSIONAPI static const core::Property ConnectionPerFlowFile;
+  EXTENSIONAPI static const core::Property OutgoingMessageDelimiter;
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property MaxSizeOfSocketSendBuffer;
+
+  static auto properties() { return std::array{Hostname, Port, IdleConnectionExpiration, Timeout, ConnectionPerFlowFile, OutgoingMessageDelimiter, SSLContextService, MaxSizeOfSocketSendBuffer}; }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  explicit PutTCP(const std::string& name, const utils::Identifier& uuid = {});
+  PutTCP(const PutTCP&) = delete;
+  PutTCP& operator=(const PutTCP&) = delete;
+  ~PutTCP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+ private:
+  void removeExpiredConnections();
+  void processFlowFile(std::shared_ptr<IConnectionHandler>& connection_handler,
+                       const std::vector<std::byte>& data,
+                       core::ProcessSession& session,
+                       const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::vector<std::byte> delimiter_;
+  std::optional<std::unordered_map<ConnectionId, std::shared_ptr<IConnectionHandler>, ConnectionId::hash>> connections_;
+  std::optional<std::chrono::milliseconds> idle_connection_expiration_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+  std::chrono::milliseconds timeout_;

Review Comment:
   `timeout_` should be initialized



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms)
+    idle_connection_expiration_ = idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name))
+    ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });

Review Comment:
   I think `delimiter = ranges::views::transform(*delimiter_str, [](char c) { return std::byte(c); }) | ranges::to<std::vector>();` would be nicer (also maybe `static_cast` instead of C-style cast)



##########
extensions/standard-processors/tests/unit/resources/bob_by_A.pem:
##########
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----

Review Comment:
   it doesn't look like we use these two `bob_by_...` certificates; can we remove them?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources/localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServer();
+  }
+
+  void startTCPServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, core::logging::LoggerFactory<utils::net::Server>().getLogger());
+    server_thread_ = std::thread([this]() { listener_->run(); });
+  }
+
+  void startSSLServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
+        port_,
+        core::logging::LoggerFactory<utils::net::Server>().getLogger(),
+        createSslDataForServer(),
+        utils::net::SslServer::ClientAuthOption::REQUIRED);
+    server_thread_ = std::thread([this]() { listener_->run(); });
+  }
+
+  void stopServer() {
+    if (listener_)
+      listener_->stop();
+    if (server_thread_.joinable())
+      server_thread_.join();
+    listener_.reset();
+  }
+
+  size_t getNumberOfActiveSessions() {
+    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(listener_.get())) {
+      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    }
+    return -1;
+  }
+
+  void closeActiveConnections() {
+    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(listener_.get())) {
+      session_aware_listener->closeSessions();
+    }
+    std::this_thread::sleep_for(200ms);
+  }
+
+  auto trigger(const std::string_view& message) {
+    return controller_.trigger(message);
+  }
+
+  auto getContent(const auto& flow_file) {
+    return controller_.plan->getContent(flow_file);
+  }
+
+  std::optional<utils::net::Message> tryDequeueReceivedMessage() {
+    auto timeout = 200ms;
+    auto interval = 10ms;
+
+    auto start_time = std::chrono::system_clock::now();
+    utils::net::Message result;
+    while (start_time + timeout > std::chrono::system_clock::now()) {
+      if (listener_->tryDequeue(result))
+        return result;
+      std::this_thread::sleep_for(interval);
+    }
+    return std::nullopt;
+  }
+
+  void addSSLContextToPutTCP(const std::filesystem::path& ca_cert, std::optional<std::filesystem::path> client_cert_key) {

Review Comment:
   the second parameter should also be `const &`



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources/localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServer();
+  }
+
+  void startTCPServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, core::logging::LoggerFactory<utils::net::Server>().getLogger());

Review Comment:
   `getLogger` is static, so the constructor call is not needed:
   ```suggestion
       listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, core::logging::LoggerFactory<utils::net::Server>::getLogger());
   ```
   
   (also in line 141)



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms)
+    idle_connection_expiration_ = idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name))
+    ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });
+  }
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public IConnectionHandler {
+ public:
+  ConnectionHandler(ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
+    if (!last_used_)
+      return false;
+    return *last_used_ >= (std::chrono::steady_clock::now() - dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(const std::error_code& error_code, const std::shared_ptr<SocketType>& socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(const std::error_code& error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(const std::error_code& error,
+                       const tcp::resolver::results_type::iterator& endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(const std::error_code& error,
+                   std::size_t bytes_written,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(const std::error_code& error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+
+  ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket, const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, data, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(const std::error_code& error_code, const std::shared_ptr<SocketType>& socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](const std::error_code& error_code) { checkDeadline(error_code, socket); });
+    socket->lowest_layer().close();
+  }
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
+  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
+    logger_->log_trace("No more endpoints to try");
+    deadline_.cancel();
+    return;
+  }
+
+  last_error_.clear();
+  deadline_.expires_after(timeout_);
+  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
+                                       [&socket, endpoint_iter, this](std::error_code err) {
+                                         handleConnect(err, endpoint_iter, socket);
+                                       });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleConnect(const std::error_code& error,
+                                                   tcp::resolver::results_type::iterator endpoint_iter,
+                                                   const std::shared_ptr<SocketType>& socket) {

Review Comment:
   the parameters of this function, and several others later in this file, are not aligned correctly



##########
extensions/standard-processors/tests/unit/resources/ca_B.crt:
##########
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----

Review Comment:
   From the unit test, it looks like this is an invalid certificate.  Is that true, and if it is, then can you put "invalid" in the file name, please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org