You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "fgerlits (via GitHub)" <gi...@apache.org> on 2023/06/29 16:38:58 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

fgerlits commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1243980137


##########
extensions/standard-processors/processors/TailFile.cpp:
##########
@@ -343,10 +331,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
     throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  std::string value;
-
-  if (context->getProperty(Delimiter.getName(), value)) {
-    delimiter_ = parseDelimiter(value);
+  if (auto delimiter_str = context->getProperty(Delimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single character, whether escaped or not)", *delimiter_str));
+    delimiter_ = *parsed_delimiter;

Review Comment:
   This will invalidate some TailFile configs which were previously accepted (but probably shouldn't have been): if `Delimiter` contains more than one character, previously we used the first character, now we'll throw.  I'm not sure if this is a problem, but it's something we should discuss.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");

Review Comment:
   it's unlikely we'll want to rename this, but just in case, `EndpointList.getName()` would be better



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   It could be useful to add `OutputAttribute` definitions for these, so we'll have them documented in `PROCESSORS.md`.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,
+    asio::steady_timer::duration reconnection_interval,
+    std::optional<asio::ssl::context> ssl_context,
+    std::optional<size_t> max_queue_size,
+    std::optional<size_t> max_message_size,
+    std::vector<utils::net::ConnectionId> connections,
+    std::shared_ptr<core::logging::Logger> logger)
+    : delimiter_(delimiter),
+    timeout_duration_(timeout_duration),
+    reconnection_interval_(reconnection_interval),
+    ssl_context_(std::move(ssl_context)),
+    max_queue_size_(max_queue_size),
+    max_message_size_(max_message_size),
+    connections_(std::move(connections)),
+    logger_(std::move(logger)) {
+}
+
+GetTCP::TcpClient::~TcpClient() {
+  stop();
+}
+
+
+void GetTCP::TcpClient::run() {
+  gsl_Expects(!connections_.empty());
+  for (const auto& connection_id : connections_) {
+    asio::co_spawn(io_context_, doReceiveFrom(connection_id), asio::detached);  // NOLINT
+  }
+  io_context_.run();
+}
+
+void GetTCP::TcpClient::stop() {
+  io_context_.stop();
+}
+
+bool GetTCP::TcpClient::queueEmpty() const {
+  return concurrent_queue_.empty();
+}
+
+bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
+  return concurrent_queue_.tryDequeue(received_message);
+}
+
+asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
+  std::string read_message;
+  bool last_was_partial = false;
+  bool current_is_partial = false;
+  while (true) {
+    {
+      last_was_partial = current_is_partial;
+      current_is_partial = false;
+    }
+    auto dynamic_buffer = max_message_size_ ? asio::dynamic_buffer(read_message, *max_message_size_) : asio::dynamic_buffer(read_message);
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, dynamic_buffer, delimiter_, utils::net::use_nothrow_awaitable);  // NOLINT
+
+    if (*max_message_size_ && read_error == asio::error::not_found) {
+      current_is_partial = true;
+      bytes_read = *max_message_size_;
+    } else if (read_error) {
+      logger_->log_error("Error during read %s", read_error.message());
+      co_return read_error;
     }
-#endif
-    if (hostAndPort.size() != 2) {
+
+    if (bytes_read == 0)
       continue;
+
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
+      if (last_was_partial || current_is_partial)
+        message.is_partial = true;

Review Comment:
   why is this message partial if the previous message was partial?



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   I think a single `tcp.sender` output attribute containing `host:port` would be easier to understand.  This way, it's not clear if `tcp.port` is the server port or client port.  Alternatively, we could have `tcp.sender.host` and `tcp.sender.port`.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);

Review Comment:
   If `max_batch_size_` is zero, it would be better to throw in `onSchedule()` instead of terminating minifi here.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;

Review Comment:
   nitpicking, but the name of a local variable shouldn't have a trailing underscore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org