You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/06/24 12:57:59 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1350: MINIFICPP-1857 Create ListenTCP processor

fgerlits commented on code in PR #1350:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1350#discussion_r905172649


##########
PROCESSORS.md:
##########
@@ -1279,6 +1280,24 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 
 
+## ListenTCP
+
+### Description
+
+Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. For each message the processor produces a single FlowFile.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | Description                                                                                                                                                                                   |
+|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                  | The port to listen on for communication.                                                                                                                                                      |
+| **Max Batch Size**            | 500           |                  | The maximum number of messages to process at a time.                                                                                                                                          |
+| **Max Size of Message Queue** | 0             |                  | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer full, the message is ignored. If set to zero the buffer is unlimited. |

Review Comment:
   Are we sure we want the default buffer size to be unlimited?  I understand that dropping messages is bad, but unlimited memory consumption can be even worse.



##########
docker/test/integration/steps/steps.py:
##########
@@ -416,6 +416,12 @@ def step_impl(context):
     context.test.start_splunk()
 
 
+# TCP client
+@given('a TCP client is setup to send logs to minifi')

Review Comment:
   "set up" should be two words; "setup" is a noun



##########
PROCESSORS.md:
##########
@@ -1279,6 +1280,24 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 
 
+## ListenTCP
+
+### Description
+
+Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. For each message the processor produces a single FlowFile.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | Description                                                                                                                                                                                   |
+|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                  | The port to listen on for communication.                                                                                                                                                      |
+| **Max Batch Size**            | 500           |                  | The maximum number of messages to process at a time.                                                                                                                                          |
+| **Max Size of Message Queue** | 0             |                  | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer full, the message is ignored. If set to zero the buffer is unlimited. |

Review Comment:
   typo:
   ```suggestion
   | **Max Size of Message Queue** | 0             |                  | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. |
   ```



##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -86,72 +86,20 @@ void ListenSyslog::initialize() {
 void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context && !server_thread_.joinable() && !server_);

Review Comment:
   Since `server_thread_` and `server_` have been moved to `NetworkListenerProcessor`, I would move these two expectation checks to `NetworkListenerProcessor::startServer()`.
   
   (Also from `ListenTCP::onSchedule()`.)



##########
libminifi/include/utils/net/UdpServer.h:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "Server.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+class UdpServer : public Server {
+ public:
+  UdpServer(std::optional<size_t> max_queue_size,
+            uint16_t port,
+            std::shared_ptr<core::logging::Logger> logger);
+
+ private:
+  void doReceive();
+
+  asio::ip::udp::socket socket_;
+  asio::ip::udp::endpoint sender_endpoint_;
+  std::string buffer_;
+
+  static inline constexpr size_t MAX_UDP_PACKET_SIZE = 65535;

Review Comment:
   Is this `inline` necessary?  I think `static constexpr` member variables are `inline` already.



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenTCP.h"
+#include "SingleProcessorTestController.h"
+#include "asio.hpp"
+
+using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10254;
+using ProcessorTriggerResult = std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>;
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), PORT);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+}
+
+bool triggerUntil(test::SingleProcessorTestController& controller,
+                  const std::unordered_map<core::Relationship, size_t>& expected_quantities,
+                  ProcessorTriggerResult& result,
+                  const std::chrono::milliseconds max_duration,
+                  const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    for (auto& [relationship, flow_files] : controller.trigger()) {
+      result[relationship].insert(result[relationship].end(), flow_files.begin(), flow_files.end());
+    }
+    bool expected_quantities_met = true;
+    for (const auto& [relationship, expected_quantity] : expected_quantities) {
+      if (result[relationship].size() < expected_quantity) {
+        expected_quantities_met = false;
+        break;
+      }
+    }
+    if (expected_quantities_met)
+      return true;

Review Comment:
   just a suggestion, feel free to ignore, but I think this would be nicer:
   ```suggestion
       if (ranges::all_of(expected_quantities, [&result](const auto& kv) {
         const auto& [relationship, expected_quantity] = kv;
         return result[relationship].size() >= expected_quantity;
       })) {
         return true;
       }
   ```



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenTCP.h"
+#include "SingleProcessorTestController.h"
+#include "asio.hpp"
+
+using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10254;
+using ProcessorTriggerResult = std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>;
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), PORT);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+}
+
+bool triggerUntil(test::SingleProcessorTestController& controller,
+                  const std::unordered_map<core::Relationship, size_t>& expected_quantities,
+                  ProcessorTriggerResult& result,
+                  const std::chrono::milliseconds max_duration,
+                  const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    for (auto& [relationship, flow_files] : controller.trigger()) {
+      result[relationship].insert(result[relationship].end(), flow_files.begin(), flow_files.end());
+    }
+    bool expected_quantities_met = true;
+    for (const auto& [relationship, expected_quantity] : expected_quantities) {
+      if (result[relationship].size() < expected_quantity) {
+        expected_quantities_met = false;
+        break;
+      }
+    }
+    if (expected_quantities_met)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+bool countLogOccurrencesUntil(const std::string& pattern,
+                              const int occurrences,
+                              const std::chrono::milliseconds max_duration,
+                              const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    if (LogTestController::getInstance().countOccurrences(pattern) == occurrences)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}

Review Comment:
   This is quite generic, and it's also used in `ListenSyslogTests`, so it may be worth moving it to `TestUtils` or `IntegrationTestUtils`.



##########
docker/test/integration/steps/steps.py:
##########
@@ -416,6 +416,12 @@ def step_impl(context):
     context.test.start_splunk()
 
 
+# TCP client
+@given('a TCP client is setup to send logs to minifi')

Review Comment:
   why "logs"?  it sends a test message
   ```suggestion
   @given('a TCP client is set up to send a test TCP message to minifi')
   ```



##########
extensions/standard-processors/processors/NetworkListenerProcessor.cpp:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NetworkListenerProcessor.h"
+#include "utils/net/UdpServer.h"
+#include "utils/net/TcpServer.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+NetworkListenerProcessor::~NetworkListenerProcessor() {
+  stopServer();
+}
+
+void NetworkListenerProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!server_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!server_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
+  }
+}
+
+void NetworkListenerProcessor::startServer(
+    const core::ProcessContext& context, const core::Property& max_batch_size_prop, const core::Property& max_queue_size_prop, const core::Property& port_prop, utils::net::Protocol protocol) {
+  context.getProperty(max_batch_size_prop.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  uint64_t max_queue_size = 0;
+  context.getProperty(max_queue_size_prop.getName(), max_queue_size);
+  auto max_queue_size_opt = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt;
+
+  int port;
+  context.getProperty(port_prop.getName(), port);
+
+  if (protocol == utils::net::Protocol::UDP) {
+    server_ = std::make_unique<utils::net::UdpServer>(max_queue_size_opt, port, logger_);
+  } else if (protocol == utils::net::Protocol::TCP) {
+    server_ = std::make_unique<utils::net::TcpServer>(max_queue_size_opt, port, logger_);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid protocol");
+  }
+
+  server_thread_ = std::thread([this]() { server_->run(); });
+  logger_->log_debug("Started %s server on port %d with %s max queue size and %zu max batch size",
+                     protocol.toString(),
+                     port,
+                     max_queue_size_opt ? std::to_string(*max_queue_size_opt) : "no",

Review Comment:
   nitpicking, but I would use "unlimited" instead of "no"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org