You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "martinzink (via GitHub)" <gi...@apache.org> on 2023/06/20 12:02:05 UTC

[GitHub] [nifi-minifi-cpp] martinzink opened a new pull request, #1592: MINIFICPP-2131 Refactored GetTCP

martinzink opened a new pull request, #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592

   **This breaks compatibility with old configs containing GetTCP**
   
   Reworked GetTCP because it was unsafe with multiple endpoints, didnt work like NiFi's implementation, used too much unneccesary resources, and lacked features that are present in similar processors (e.g. ListenTCP/ListenSyslog).
   
   ---
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236810704


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in endpoint-list property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;

Review Comment:
   you are right, I've fixed this (unsuccesfully) in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/9dce92909600a74ba320bd3c56612acd0c8ab5fa (and fixed the fix in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/c824ab601e3bbdc01177a55496096ddc625356b1)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236822566


##########
libminifi/test/resources/TestC2Metrics.yml:
##########
@@ -31,8 +31,8 @@ Processors:
       run duration nanos: 0
       auto-terminated relationships list:
       Properties:
-          endpoint-list: localhost:8776
-          end-of-message-byte: d
+          Endpoint List: localhost:8776
+          Message Delimiter: \r
           reconnect-interval: 100ms
           connection-attempt-timeout: 2000

Review Comment:
   should we update the properties to their new names?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236720636


##########
extensions/standard-processors/tests/unit/GetTCPTests.cpp:
##########
@@ -15,391 +14,286 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <utility>
-#include <memory>
 #include <string>
-#include <vector>
-#include <set>
-#include "unit/ProvenanceTestHelper.h"
-#include "TestBase.h"
-#include "Catch.h"
-#include "RandomServerSocket.h"
-#include "Scheduling.h"
-#include "LogAttribute.h"
-#include "GetTCP.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-
-TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
-  TestController testController;
-  std::vector<uint8_t> buffer;
-  for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
-    buffer.push_back(c);
-  }
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-
-  content_repo->initialize(std::make_shared<minifi::Configure>());
-
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
-  org::apache::nifi::minifi::io::RandomServerSocket server(org::apache::nifi::minifi::io::Socket::getMyHostName());
-
-  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
-  LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
-  LogTestController::getInstance().setTrace<minifi::io::Socket>();
-
-  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
-  auto processor = std::make_unique<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
-
-  auto logAttribute = std::make_unique<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
-  processor->setStreamFactory(stream_factory);
-  processor->initialize();
-
-  utils::Identifier processoruuid = processor->getUUID();
-  REQUIRE(processoruuid);
-
-  utils::Identifier logattribute_uuid = logAttribute->getUUID();
-  REQUIRE(logattribute_uuid);
-
-  REQUIRE(processoruuid.to_string() != logattribute_uuid.to_string());
-
-  auto connection = std::make_unique<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
-  connection->addRelationship(core::Relationship("success", "description"));
-
-  auto connection2 = std::make_unique<minifi::Connection>(repo, content_repo, "logattribute");
-  connection2->addRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor.get());
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(logAttribute.get());
 
-  connection2->setSource(logAttribute.get());
-
-  connection2->setSourceUUID(logattribute_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(logattribute_uuid);
-
-  processor->addConnection(connection.get());
-  logAttribute->addConnection(connection.get());
-  logAttribute->addConnection(connection2.get());
-
-  auto node = std::make_shared<core::ProcessorNode>(processor.get());
-  auto node2 = std::make_shared<core::ProcessorNode>(logAttribute.get());
-  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
-  auto context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "200 msec");
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ConnectionAttemptLimit, "10");
-  auto session = std::make_shared<core::ProcessSession>(context);
-  auto session2 = std::make_shared<core::ProcessSession>(context2);
-
-  REQUIRE(processor->getName() == "gettcpexample");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  processor->onTrigger(context, session);
-  server.write(buffer, buffer.size());
-  std::this_thread::sleep_for(std::chrono::seconds(2));
-
-  logAttribute->initialize();
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
-  logAttribute->onSchedule(context2, factory2);
-  logAttribute->onTrigger(context2, session2);
+#include "Catch.h"
+#include "processors/GetTCP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+#include "utils/gsl.h"
 
-  auto reporter = session->getProvenanceReporter();
-  auto records = reporter->getEvents();
-  record = session->get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.empty());
+using GetTCP = org::apache::nifi::minifi::processors::GetTCP;
 
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(context, session);
-  reporter = session->getProvenanceReporter();
+using namespace std::literals::chrono_literals;
 
-  session->commit();
+namespace org::apache::nifi::minifi::test {
 
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(context2, session2);
+void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("tcp.port"));
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
+}
 
-  REQUIRE(true == LogTestController::getInstance().contains("Reconnect interval is 200 ms"));
-  REQUIRE(true == LogTestController::getInstance().contains("Size:45 Offset:0"));
+minifi::utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  minifi::utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources" / "localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources" / "localhost_by_A.pem").string();
+  return ssl_data;
+}
 
-  LogTestController::getInstance().reset();
+void addSslContextServiceTo(SingleProcessorTestController& controller) {
+  auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
+  LogTestController::getInstance().setTrace<GetTCP>();
+  const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(), (executable_dir / "resources" / "ca_A.crt").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(), (executable_dir / "resources" / "alice_by_A.pem").string()));
+  REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(), (executable_dir / "resources" / "alice_by_A.pem").string()));
+  ssl_context_service->enable();
 }
 
-TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
-  std::vector<uint8_t> buffer;
-  for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
-    buffer.push_back(c);
+class TcpTestServer {
+ public:
+  void run() {
+    server_thread_ = std::thread([&]() {
+      asio::co_spawn(io_context_, listenAndSendMessages(), asio::detached);
+      io_context_.run();
+    });
   }
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-
-  content_repo->initialize(std::make_shared<minifi::Configure>());
-
-  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
-
-  TestController testController;
-
-  org::apache::nifi::minifi::io::RandomServerSocket server(org::apache::nifi::minifi::io::Socket::getMyHostName());
-
-  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository >();
-  LogTestController::getInstance().setTrace<minifi::processors::GetTCP>();
-  LogTestController::getInstance().setTrace<core::ConfigurableComponent>();
-  LogTestController::getInstance().setTrace<minifi::io::Socket>();
-
-  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
 
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
-  processor->setStreamFactory(stream_factory);
-  processor->initialize();
-
-  utils::Identifier processoruuid = processor->getUUID();
-  REQUIRE(processoruuid);
-
-  utils::Identifier logattribute_uuid = logAttribute->getUUID();
-  REQUIRE(logattribute_uuid);
-
-  auto connection = std::make_unique<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
-  connection->addRelationship(core::Relationship("partial", "description"));
-
-  auto connection2 = std::make_unique<minifi::Connection>(repo, content_repo, "logattribute");
-  connection2->addRelationship(core::Relationship("partial", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor.get());
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(logAttribute.get());
-
-  connection2->setSource(logAttribute.get());
-
-  connection2->setSourceUUID(logattribute_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(logattribute_uuid);
-
-  processor->addConnection(connection.get());
-  logAttribute->addConnection(connection.get());
-  logAttribute->addConnection(connection2.get());
-
-  auto node = std::make_shared<core::ProcessorNode>(processor.get());
-  auto node2 = std::make_shared<core::ProcessorNode>(logAttribute.get());
-  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
-  auto context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "200 msec");
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ConnectionAttemptLimit, "10");
-  // we're using new lines above
-  context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte, "10");
-  auto session = std::make_shared<core::ProcessSession>(context);
-  auto session2 = std::make_shared<core::ProcessSession>(context2);
-
-
-  REQUIRE(processor->getName() == "gettcpexample");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  processor->onTrigger(context, session);
-  server.write(buffer, buffer.size());
-  std::this_thread::sleep_for(std::chrono::seconds(2));
+  void queueMessage(std::string message) {
+    messages_to_send_.enqueue(std::move(message));
+  }
 
-  logAttribute->initialize();
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
-  logAttribute->onSchedule(context2, factory2);
-  logAttribute->onTrigger(context2, session2);
+  void enableSSL() {
+    const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
 
-  auto reporter = session->getProvenanceReporter();
-  auto records = reporter->getEvents();
-  record = session->get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.empty());
+    asio::ssl::context ssl_context(asio::ssl::context::tls_server);
+    ssl_context.set_options(asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1);
+    ssl_context.set_password_callback([key_pw = "Password12"](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; });
+    ssl_context.use_certificate_file((executable_dir / "resources" / "localhost_by_A.pem").string(), asio::ssl::context::pem);
+    ssl_context.use_private_key_file((executable_dir / "resources" / "localhost_by_A.pem").string(), asio::ssl::context::pem);
+    ssl_context.load_verify_file((executable_dir / "resources" / "ca_A.crt").string());
+    ssl_context.set_verify_mode(asio::ssl::verify_peer);
 
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(context, session);
-  reporter = session->getProvenanceReporter();
+    ssl_context_ = std::move(ssl_context);
+  }
 
-  session->commit();
+  uint16_t getPort() const {
+    return port_;
+  }
 
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(context2, session2);
+  ~TcpTestServer() {
+    io_context_.stop();
+    if (server_thread_.joinable())
+      server_thread_.join();
+  }
 
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(context2, session2);
+ private:
+  asio::awaitable<void> sendMessages(auto& socket) {
+    while (true) {
+      std::string message_to_send;
+      if (!messages_to_send_.tryDequeue(message_to_send)) {
+        co_await minifi::utils::net::async_wait(10ms);
+        continue;
+      }
+      co_await asio::async_write(socket, asio::buffer(message_to_send), minifi::utils::net::use_nothrow_awaitable);

Review Comment:
   how is this loop terminated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1243980137


##########
extensions/standard-processors/processors/TailFile.cpp:
##########
@@ -343,10 +331,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
     throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  std::string value;
-
-  if (context->getProperty(Delimiter.getName(), value)) {
-    delimiter_ = parseDelimiter(value);
+  if (auto delimiter_str = context->getProperty(Delimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single character, whether escaped or not)", *delimiter_str));
+    delimiter_ = *parsed_delimiter;

Review Comment:
   This will invalidate some TailFile configs which were previously accepted (but probably shouldn't have been): if `Delimiter` contains more than one character, previously we used the first character, now we'll throw.  I'm not sure if this is a problem, but it's something we should discuss.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");

Review Comment:
   it's unlikely we'll want to rename this, but just in case, `EndpointList.getName()` would be better



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   It could be useful to add `OutputAttribute` definitions for these, so we'll have them documented in `PROCESSORS.md`.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,
+    asio::steady_timer::duration reconnection_interval,
+    std::optional<asio::ssl::context> ssl_context,
+    std::optional<size_t> max_queue_size,
+    std::optional<size_t> max_message_size,
+    std::vector<utils::net::ConnectionId> connections,
+    std::shared_ptr<core::logging::Logger> logger)
+    : delimiter_(delimiter),
+    timeout_duration_(timeout_duration),
+    reconnection_interval_(reconnection_interval),
+    ssl_context_(std::move(ssl_context)),
+    max_queue_size_(max_queue_size),
+    max_message_size_(max_message_size),
+    connections_(std::move(connections)),
+    logger_(std::move(logger)) {
+}
+
+GetTCP::TcpClient::~TcpClient() {
+  stop();
+}
+
+
+void GetTCP::TcpClient::run() {
+  gsl_Expects(!connections_.empty());
+  for (const auto& connection_id : connections_) {
+    asio::co_spawn(io_context_, doReceiveFrom(connection_id), asio::detached);  // NOLINT
+  }
+  io_context_.run();
+}
+
+void GetTCP::TcpClient::stop() {
+  io_context_.stop();
+}
+
+bool GetTCP::TcpClient::queueEmpty() const {
+  return concurrent_queue_.empty();
+}
+
+bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
+  return concurrent_queue_.tryDequeue(received_message);
+}
+
+asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
+  std::string read_message;
+  bool last_was_partial = false;
+  bool current_is_partial = false;
+  while (true) {
+    {
+      last_was_partial = current_is_partial;
+      current_is_partial = false;
+    }
+    auto dynamic_buffer = max_message_size_ ? asio::dynamic_buffer(read_message, *max_message_size_) : asio::dynamic_buffer(read_message);
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, dynamic_buffer, delimiter_, utils::net::use_nothrow_awaitable);  // NOLINT
+
+    if (*max_message_size_ && read_error == asio::error::not_found) {
+      current_is_partial = true;
+      bytes_read = *max_message_size_;
+    } else if (read_error) {
+      logger_->log_error("Error during read %s", read_error.message());
+      co_return read_error;
     }
-#endif
-    if (hostAndPort.size() != 2) {
+
+    if (bytes_read == 0)
       continue;
+
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
+      if (last_was_partial || current_is_partial)
+        message.is_partial = true;

Review Comment:
   why is this message partial if the previous message was partial?



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   I think a single `tcp.sender` output attribute containing `host:port` would be easier to understand.  This way, it's not clear if `tcp.port` is the server port or client port.  Alternatively, we could have `tcp.sender.host` and `tcp.sender.port`.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);

Review Comment:
   If `max_batch_size_` is zero, it would be better to throw in `onSchedule()` instead of terminating minifi here.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;

Review Comment:
   nitpicking, but the name of a local variable shouldn't have a trailing underscore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247805318


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");

Review Comment:
   good idea, changed it in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247806174


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);

Review Comment:
   Good catch, I've added PROCESS_SCHEDULE_EXCEPTION in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R139-R141



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);

Review Comment:
   Good catch, I've added PROCESS_SCHEDULE_EXCEPTION in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R139-R141



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247804838


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   You are right, originally I mimicked how ListenTCP works, but this might be better. Also Nifi works the same way in GetTCP, so I went with `source.endpoint` in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa similar to Nifi https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java#L61



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());

Review Comment:
   Good idea, changed it in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1258212778


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,290 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
-#include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
+const core::OutputAttribute GetTCP::SourceEndpoint{"source.endpoint", {Success, Partial}, "The address of the source endpoint the message came from"};
 
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("No valid endpoint in {} property", EndpointList.getName()));
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    if (*max_batch_size == 0) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("{} should be non-zero.", MaxBatchSize.getName()));
+    }
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute(GetTCP::SourceEndpoint.getName(), fmt::format("{}:{}", message.sender_address.to_string(), std::to_string(message.server_port)));
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,

Review Comment:
   Yeah looks strange, I agree. I've checked the [styleguide](https://google.github.io/styleguide/cppguide.html#Constructor_Initializer_Lists) and it looks like we should align the initializer list with the first line (i.e 6 spaces). I've checked and we are using this style and your recommendation extensively in the codebase. I've went with the styleguide recommendation. https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/1c380c824e18c8c0ec2ae6eada4ba4644e444feb#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R218-R224



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1248116652


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,
+    asio::steady_timer::duration reconnection_interval,
+    std::optional<asio::ssl::context> ssl_context,
+    std::optional<size_t> max_queue_size,
+    std::optional<size_t> max_message_size,
+    std::vector<utils::net::ConnectionId> connections,
+    std::shared_ptr<core::logging::Logger> logger)
+    : delimiter_(delimiter),
+    timeout_duration_(timeout_duration),
+    reconnection_interval_(reconnection_interval),
+    ssl_context_(std::move(ssl_context)),
+    max_queue_size_(max_queue_size),
+    max_message_size_(max_message_size),
+    connections_(std::move(connections)),
+    logger_(std::move(logger)) {
+}
+
+GetTCP::TcpClient::~TcpClient() {
+  stop();
+}
+
+
+void GetTCP::TcpClient::run() {
+  gsl_Expects(!connections_.empty());
+  for (const auto& connection_id : connections_) {
+    asio::co_spawn(io_context_, doReceiveFrom(connection_id), asio::detached);  // NOLINT
+  }
+  io_context_.run();
+}
+
+void GetTCP::TcpClient::stop() {
+  io_context_.stop();
+}
+
+bool GetTCP::TcpClient::queueEmpty() const {
+  return concurrent_queue_.empty();
+}
+
+bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
+  return concurrent_queue_.tryDequeue(received_message);
+}
+
+asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
+  std::string read_message;
+  bool last_was_partial = false;
+  bool current_is_partial = false;
+  while (true) {
+    {
+      last_was_partial = current_is_partial;
+      current_is_partial = false;
+    }
+    auto dynamic_buffer = max_message_size_ ? asio::dynamic_buffer(read_message, *max_message_size_) : asio::dynamic_buffer(read_message);
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, dynamic_buffer, delimiter_, utils::net::use_nothrow_awaitable);  // NOLINT
+
+    if (*max_message_size_ && read_error == asio::error::not_found) {
+      current_is_partial = true;
+      bytes_read = *max_message_size_;
+    } else if (read_error) {
+      logger_->log_error("Error during read %s", read_error.message());
+      co_return read_error;
     }
-#endif
-    if (hostAndPort.size() != 2) {
+
+    if (bytes_read == 0)
       continue;
+
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
+      if (last_was_partial || current_is_partial)
+        message.is_partial = true;

Review Comment:
   On second thought it might be easier to understand if I simply rename these bools https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/91d8d09ddf13e2abc56314cb99dbeeb0019a4107



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236696758


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in endpoint-list property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;

Review Comment:
   won't this only check if the returned value is expected, i.e. an `std::optional<char>`, so dereferencing it is not yet safe? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits closed pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits closed pull request #1592: MINIFICPP-2131 Refactored GetTCP
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247805565


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;

Review Comment:
   good catch changed it in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R122



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247722632


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,
+    asio::steady_timer::duration reconnection_interval,
+    std::optional<asio::ssl::context> ssl_context,
+    std::optional<size_t> max_queue_size,
+    std::optional<size_t> max_message_size,
+    std::vector<utils::net::ConnectionId> connections,
+    std::shared_ptr<core::logging::Logger> logger)
+    : delimiter_(delimiter),
+    timeout_duration_(timeout_duration),
+    reconnection_interval_(reconnection_interval),
+    ssl_context_(std::move(ssl_context)),
+    max_queue_size_(max_queue_size),
+    max_message_size_(max_message_size),
+    connections_(std::move(connections)),
+    logger_(std::move(logger)) {
+}
+
+GetTCP::TcpClient::~TcpClient() {
+  stop();
+}
+
+
+void GetTCP::TcpClient::run() {
+  gsl_Expects(!connections_.empty());
+  for (const auto& connection_id : connections_) {
+    asio::co_spawn(io_context_, doReceiveFrom(connection_id), asio::detached);  // NOLINT
+  }
+  io_context_.run();
+}
+
+void GetTCP::TcpClient::stop() {
+  io_context_.stop();
+}
+
+bool GetTCP::TcpClient::queueEmpty() const {
+  return concurrent_queue_.empty();
+}
+
+bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
+  return concurrent_queue_.tryDequeue(received_message);
+}
+
+asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
+  std::string read_message;
+  bool last_was_partial = false;
+  bool current_is_partial = false;
+  while (true) {
+    {
+      last_was_partial = current_is_partial;
+      current_is_partial = false;
+    }
+    auto dynamic_buffer = max_message_size_ ? asio::dynamic_buffer(read_message, *max_message_size_) : asio::dynamic_buffer(read_message);
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, dynamic_buffer, delimiter_, utils::net::use_nothrow_awaitable);  // NOLINT
+
+    if (*max_message_size_ && read_error == asio::error::not_found) {
+      current_is_partial = true;
+      bytes_read = *max_message_size_;
+    } else if (read_error) {
+      logger_->log_error("Error during read %s", read_error.message());
+      co_return read_error;
     }
-#endif
-    if (hostAndPort.size() != 2) {
+
+    if (bytes_read == 0)
       continue;
+
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
+      if (last_was_partial || current_is_partial)
+        message.is_partial = true;

Review Comment:
   Partial messages happen when we would read more than the configured max_size and the delimiter has not ben found. In that case the message (larger than the maxsize) would have been split at max_size intervals which would make all the flowfiles which didnt end with the delimiter partial and the next one aswell (thats probably the last part of the message (ending with delimiter)).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1238124712


##########
libminifi/test/resources/TestC2Metrics.yml:
##########
@@ -31,8 +31,8 @@ Processors:
       run duration nanos: 0
       auto-terminated relationships list:
       Properties:
-          endpoint-list: localhost:8776
-          end-of-message-byte: d
+          Endpoint List: localhost:8776
+          Message Delimiter: \r
           reconnect-interval: 100ms
           connection-attempt-timeout: 2000

Review Comment:
   sure thing, best not the confuse anyone :+1: https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/ed21b198d8fb54a4f1c483b63cf9f1013ecee5f9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247960613


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint List property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context_;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context_), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,
+    asio::steady_timer::duration reconnection_interval,
+    std::optional<asio::ssl::context> ssl_context,
+    std::optional<size_t> max_queue_size,
+    std::optional<size_t> max_message_size,
+    std::vector<utils::net::ConnectionId> connections,
+    std::shared_ptr<core::logging::Logger> logger)
+    : delimiter_(delimiter),
+    timeout_duration_(timeout_duration),
+    reconnection_interval_(reconnection_interval),
+    ssl_context_(std::move(ssl_context)),
+    max_queue_size_(max_queue_size),
+    max_message_size_(max_message_size),
+    connections_(std::move(connections)),
+    logger_(std::move(logger)) {
+}
+
+GetTCP::TcpClient::~TcpClient() {
+  stop();
+}
+
+
+void GetTCP::TcpClient::run() {
+  gsl_Expects(!connections_.empty());
+  for (const auto& connection_id : connections_) {
+    asio::co_spawn(io_context_, doReceiveFrom(connection_id), asio::detached);  // NOLINT
+  }
+  io_context_.run();
+}
+
+void GetTCP::TcpClient::stop() {
+  io_context_.stop();
+}
+
+bool GetTCP::TcpClient::queueEmpty() const {
+  return concurrent_queue_.empty();
+}
+
+bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
+  return concurrent_queue_.tryDequeue(received_message);
+}
+
+asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
+  std::string read_message;
+  bool last_was_partial = false;
+  bool current_is_partial = false;
+  while (true) {
+    {
+      last_was_partial = current_is_partial;
+      current_is_partial = false;
+    }
+    auto dynamic_buffer = max_message_size_ ? asio::dynamic_buffer(read_message, *max_message_size_) : asio::dynamic_buffer(read_message);
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, dynamic_buffer, delimiter_, utils::net::use_nothrow_awaitable);  // NOLINT
+
+    if (*max_message_size_ && read_error == asio::error::not_found) {
+      current_is_partial = true;
+      bytes_read = *max_message_size_;
+    } else if (read_error) {
+      logger_->log_error("Error during read %s", read_error.message());
+      co_return read_error;
     }
-#endif
-    if (hostAndPort.size() != 2) {
+
+    if (bytes_read == 0)
       continue;
+
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
+      if (last_was_partial || current_is_partial)
+        message.is_partial = true;

Review Comment:
   Makes sense, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1258206977


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,290 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
-#include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
+const core::OutputAttribute GetTCP::SourceEndpoint{"source.endpoint", {Success, Partial}, "The address of the source endpoint the message came from"};
 
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;

Review Comment:
   good idea, extracted these in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/1c380c824e18c8c0ec2ae6eada4ba4644e444feb



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1251900781


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,290 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
-#include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
+const core::OutputAttribute GetTCP::SourceEndpoint{"source.endpoint", {Success, Partial}, "The address of the source endpoint the message came from"};
 
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;

Review Comment:
   I would extract parsing the endpoints, parsing the delimiter and getting the ssl_context into separate functions.



##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,290 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
-#include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
+const core::OutputAttribute GetTCP::SourceEndpoint{"source.endpoint", {Success, Partial}, "The address of the source endpoint the message came from"};
 
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("No valid endpoint in {} property", EndpointList.getName()));
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter || !parsed_delimiter->has_value())
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;
   }
 
-  int connects = 0;
-  if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
-    connection_attempt_limit_ = connects;
+  std::optional<asio::ssl::context> ssl_context;
+  if (auto context_name = context->getProperty(SSLContextService)) {
+    if (auto controller_service = context->getControllerService(*context_name)) {
+      if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
+        ssl_context = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
+    }
   }
-  context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
 
-  if (context->getProperty(EndOfMessageByte.getName(), value)) {
-    logger_->log_trace("EOM is passed in as %s", value);
-    int64_t byteValue = 0;
-    core::Property::StringToInt(value, byteValue);
-    endOfMessageByte = static_cast<std::byte>(byteValue & 0xFF);
-  }
+  std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
+  std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);
 
-  logger_->log_trace("EOM is defined as %i", static_cast<int>(endOfMessageByte));
+  if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
+    if (*max_batch_size == 0) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("{} should be non-zero.", MaxBatchSize.getName()));
+    }
+    max_batch_size_ = *max_batch_size;
+  }
 
-  if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
-    reconnect_interval_ = reconnect_interval->getMilliseconds();
-    logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
-  } else {
-    logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
+  asio::steady_timer::duration timeout_duration = 1s;
+  if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
+    timeout_duration = timeout_value->getMilliseconds();
   }
 
-  handler_ = std::make_unique<DataHandler>(sessionFactory);
-
-  f_ex = [&] {
-    std::unique_ptr<io::Socket> socket_ptr;
-    // reuse the byte buffer.
-      std::vector<std::byte> buffer;
-      int reconnects = 0;
-      do {
-        if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
-          buffer.resize(receive_buffer_size_);
-          const auto size_read = socket_ptr->read(buffer, false);
-          if (!io::isError(size_read)) {
-            if (size_read != 0) {
-              // determine cut location
-              size_t startLoc = 0;
-              for (size_t i = 0; i < size_read; i++) {
-                if (buffer.at(i) == endOfMessageByte && i > 0) {
-                  if (i-startLoc > 0) {
-                    handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (i-startLoc), true);
-                  }
-                  startLoc = i;
-                }
-              }
-              if (startLoc > 0) {
-                logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
-                if (size_read-startLoc > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data())+startLoc, (size_read-startLoc), true);
-                }
-              } else {
-                logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
-                if (size_read > 0) {
-                  handler_->handle(socket_ptr->getHostname(), reinterpret_cast<uint8_t*>(buffer.data()), size_read, false);
-                }
-              }
-              reconnects = 0;
-            }
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
-            if (++reconnects > connection_attempt_limit_) {
-              logger_->log_info("Too many reconnects, exiting thread");
-              socket_ptr->close();
-              return -1;
-            }
-            logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
-            std::this_thread::sleep_for(reconnect_interval_);
-            socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else {
-            socket_ptr->close();
-            std::this_thread::sleep_for(reconnect_interval_);
-            logger_->log_info("Read response returned a -1 from socket, exiting thread");
-            return -1;
-          }
-        } else {
-          std::this_thread::sleep_for(reconnect_interval_);
-          logger_->log_info("Could not use socket, exiting thread");
-          return -1;
-        }
-      }while (running_);
-      logger_->log_debug("Ending private thread");
-      return 0;
-    };
-
-  if (context->getProperty(SSLContextService.getName(), value)) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-    if (nullptr != service) {
-      ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-    }
+  asio::steady_timer::duration reconnection_interval = 1min;
+  if (auto reconnect_interval_value = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+    reconnection_interval = reconnect_interval_value->getMilliseconds();
   }
 
-  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
-  client_thread_pool_.start();
 
-  running_ = true;
+  client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
+  client_thread_ = std::thread([this]() { client_->run(); });  // NOLINT
 }
 
 void GetTCP::notifyStop() {
-  running_ = false;
-  // await threads to shutdown.
-  client_thread_pool_.shutdown();
-  std::unique_ptr<io::Socket> socket_ptr;
-  while (socket_ring_buffer_.size_approx() > 0) {
-    socket_ring_buffer_.try_dequeue(socket_ptr);
+  if (client_)
+    client_->stop();
+}
+
+void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute(GetTCP::SourceEndpoint.getName(), fmt::format("{}:{}", message.sender_address.to_string(), std::to_string(message.server_port)));
+  if (message.is_partial)
+    session.transfer(flow_file, Partial);
+  else
+    session.transfer(flow_file, Success);
+}
+
+void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!client_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
   }
 }
-void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>& /*session*/) {
-  // Perform directory list
-  std::lock_guard<std::mutex> lock(mutex_);
-  // check if the futures are valid. If they've terminated remove it from the map.
-
-  for (auto &initEndpoint : endpoints) {
-    std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
-    auto realizedHost = hostAndPort.at(0);
-#ifdef WIN32
-    if ("localhost" == realizedHost) {
-      realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
+
+GetTCP::TcpClient::TcpClient(char delimiter,
+    asio::steady_timer::duration timeout_duration,

Review Comment:
   I think the parameters should only use 1 indentation level to not be on the same level as the initializer list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247906873


##########
extensions/standard-processors/processors/TailFile.cpp:
##########
@@ -343,10 +331,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
     throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  std::string value;
-
-  if (context->getProperty(Delimiter.getName(), value)) {
-    delimiter_ = parseDelimiter(value);
+  if (auto delimiter_str = context->getProperty(Delimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single character, whether escaped or not)", *delimiter_str));
+    delimiter_ = *parsed_delimiter;

Review Comment:
   I've reverted to a less radical variant. We will log errors but continue like before. https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/dcabfe37f396f64ad15d292442ca30a8865b830c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org