You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "martinzink (via GitHub)" <gi...@apache.org> on 2023/06/21 11:08:57 UTC

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1592: MINIFICPP-2131 Refactored GetTCP

martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236810704


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer within the valid byte range  (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in endpoint-list property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
+    delimiter = **parsed_delimiter;

Review Comment:
   you are right, I've fixed this (unsuccesfully) in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/9dce92909600a74ba320bd3c56612acd0c8ab5fa (and fixed the fix in https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/c824ab601e3bbdc01177a55496096ddc625356b1)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org