You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/11/28 13:34:25 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1922 Implement ListenUDP processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 20afc8447a812e99338beec386b7e02d5516b556
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Nov 28 12:29:37 2022 +0100

    MINIFICPP-1922 Implement ListenUDP processor
    
    Closes #1430
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  41 +++++++-
 README.md                                          |   6 +-
 .../processors/ListenSyslog.cpp                    |  10 +-
 .../standard-processors/processors/ListenSyslog.h  |   2 -
 .../standard-processors/processors/ListenTCP.cpp   |  10 +-
 .../standard-processors/processors/ListenTCP.h     |   2 -
 .../processors/{ListenTCP.cpp => ListenUDP.cpp}    |  50 +++------
 .../processors/{ListenTCP.h => ListenUDP.h}        |  17 +--
 .../processors/NetworkListenerProcessor.cpp        |   9 +-
 .../processors/NetworkListenerProcessor.h          |  11 +-
 .../tests/unit/ListenSyslogTests.cpp               |  10 +-
 .../tests/unit/ListenTcpTests.cpp                  |   8 +-
 .../tests/unit/ListenUDPTests.cpp                  | 115 +++++++++++++++++++++
 .../standard-processors/tests/unit/PutTCPTests.cpp |  11 +-
 14 files changed, 200 insertions(+), 102 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 3144e8390..c8cee2b17 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -46,6 +46,7 @@
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
 - [ListenTCP](#listentcp)
+- [ListenUDP](#listenudp)
 - [ListFile](#listfile)
 - [ListS3](#lists3)
 - [ListSFTP](#listsftp)
@@ -1331,10 +1332,42 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Output Attributes
 
-| Attribute                | Description                                   | Requirements           |
-|--------------------------|-----------------------------------------------|------------------------|
-| _tcp.port_               | The sending port the messages were received.  | -                      |
-| _tcp.sender_             | The sending host of the messages.             | -                      |
+| Attribute    | Description                                  | Requirements |
+|--------------|----------------------------------------------|--------------|
+| _tcp.port_   | The sending port the messages were received. | -            |
+| _tcp.sender_ | The sending host of the messages.            | -            |
+
+
+
+
+## ListenUDP
+
+### Description
+
+Listens for incoming UDP datagrams. For each datagram the processor produces a single FlowFile.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values           | Description                                                                                                                                                                                      |
+|-------------------------------|---------------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                            | The port to listen on for communication.                                                                                                                                                         |
+| **Max Batch Size**            | 500           |                            | The maximum number of messages to process at a time.                                                                                                                                             |
+| **Max Size of Message Queue** | 10000         |                            | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. |
+
+### Relationships
+
+| Name    | Description                                                        |
+|---------|--------------------------------------------------------------------|
+| success | Messages received successfully will be sent out this relationship. |
+
+### Output Attributes
+
+| Attribute    | Description                                   | Requirements |
+|--------------|-----------------------------------------------|--------------|
+| _udp.port_   | The sending port the messages were received.  | -            |
+| _udp.sender_ | The sending host of the messages.             | -            |
 
 
 ## ListFile
diff --git a/README.md b/README.md
index dc56009b3..096cea87d 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
 
 The following table lists the base set of processors.
 
-| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
-|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
-| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
+| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
+|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
+| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index e08d0cb15..41518a1d1 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -107,7 +107,7 @@ void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& conte
   context->getProperty(ProtocolProperty.getName(), protocol);
 
   if (protocol == utils::net::IpProtocol::TCP) {
-    startTcpServer(*context);
+    startTcpServer(*context, SSLContextService, ClientAuth);
   } else if (protocol == utils::net::IpProtocol::UDP) {
     startUdpServer(*context);
   } else {
@@ -168,14 +168,6 @@ const core::Property& ListenSyslog::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenSyslog::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenSyslog::getClientAuthProperty() {
-  return ClientAuth;
-}
-
 REGISTER_RESOURCE(ListenSyslog, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index ebdb16d2c..9844c0c1b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -70,8 +70,6 @@ class ListenSyslog : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
index 20d856182..94c2d9884 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -67,7 +67,7 @@ void ListenTCP::initialize() {
 
 void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context);
-  startTcpServer(*context);
+  startTcpServer(*context, SSLContextService, ClientAuth);
 }
 
 void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
@@ -90,14 +90,6 @@ const core::Property& ListenTCP::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenTCP::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenTCP::getClientAuthProperty() {
-  return ClientAuth;
-}
-
 REGISTER_RESOURCE(ListenTCP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenTCP.h
index ed41eba80..567865346 100644
--- a/extensions/standard-processors/processors/ListenTCP.h
+++ b/extensions/standard-processors/processors/ListenTCP.h
@@ -60,8 +60,6 @@ class ListenTCP : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenUDP.cpp
similarity index 58%
copy from extensions/standard-processors/processors/ListenTCP.cpp
copy to extensions/standard-processors/processors/ListenUDP.cpp
index 20d856182..819919e91 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenUDP.cpp
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "ListenTCP.h"
+#include "ListenUDP.h"
 
 #include "core/Resource.h"
 #include "core/PropertyBuilder.h"
@@ -23,14 +23,14 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-const core::Property ListenTCP::Port(
+const core::Property ListenUDP::Port(
     core::PropertyBuilder::createProperty("Listening Port")
         ->withDescription("The port to listen on for communication.")
         ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::MaxQueueSize(
+const core::Property ListenUDP::MaxQueueSize(
     core::PropertyBuilder::createProperty("Max Size of Message Queue")
         ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
                           "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
@@ -38,66 +38,46 @@ const core::Property ListenTCP::MaxQueueSize(
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::MaxBatchSize(
+const core::Property ListenUDP::MaxBatchSize(
     core::PropertyBuilder::createProperty("Max Batch Size")
         ->withDescription("The maximum number of messages to process at a time.")
         ->withDefaultValue<uint64_t>(500)
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
-
-const core::Property ListenTCP::ClientAuth(
-    core::PropertyBuilder::createProperty("Client Auth")
-      ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
-      ->build());
 
-const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
+const core::Relationship ListenUDP::Success("success", "Messages received successfully will be sent out this relationship.");
 
-void ListenTCP::initialize() {
+void ListenUDP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+void ListenUDP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context);
-  startTcpServer(*context);
+  startUdpServer(*context);
 }
 
-void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
   auto flow_file = session.create();
   session.writeBuffer(flow_file, message.message_data);
-  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
-  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  flow_file->setAttribute("udp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("udp.sender", message.sender_address.to_string());
   session.transfer(flow_file, Success);
 }
 
-const core::Property& ListenTCP::getMaxBatchSizeProperty() {
+const core::Property& ListenUDP::getMaxBatchSizeProperty() {
   return MaxBatchSize;
 }
 
-const core::Property& ListenTCP::getMaxQueueSizeProperty() {
+const core::Property& ListenUDP::getMaxQueueSizeProperty() {
   return MaxQueueSize;
 }
 
-const core::Property& ListenTCP::getPortProperty() {
+const core::Property& ListenUDP::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenTCP::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenTCP::getClientAuthProperty() {
-  return ClientAuth;
-}
-
-REGISTER_RESOURCE(ListenTCP, Processor);
+REGISTER_RESOURCE(ListenUDP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenUDP.h
similarity index 71%
copy from extensions/standard-processors/processors/ListenTCP.h
copy to extensions/standard-processors/processors/ListenUDP.h
index ed41eba80..1eb4cfd96 100644
--- a/extensions/standard-processors/processors/ListenTCP.h
+++ b/extensions/standard-processors/processors/ListenUDP.h
@@ -18,35 +18,28 @@
 
 #include <memory>
 #include <string>
-#include <utility>
 
 #include "NetworkListenerProcessor.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-class ListenTCP : public NetworkListenerProcessor {
+class ListenUDP : public NetworkListenerProcessor {
  public:
-  explicit ListenTCP(std::string name, const utils::Identifier& uuid = {})
-    : NetworkListenerProcessor(std::move(name), uuid, core::logging::LoggerFactory<ListenTCP>::getLogger()) {
+  explicit ListenUDP(const std::string& name, const utils::Identifier& uuid = {})
+    : NetworkListenerProcessor(name, uuid, core::logging::LoggerFactory<ListenUDP>::getLogger()) {
   }
 
-  EXTENSIONAPI static constexpr const char* Description = "Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. "
-                                                          "For each message the processor produces a single FlowFile.";
+  EXTENSIONAPI static constexpr const char* Description = "Listens for incoming UDP datagrams. For each datagram the processor produces a single FlowFile.";
 
   EXTENSIONAPI static const core::Property Port;
   EXTENSIONAPI static const core::Property MaxBatchSize;
   EXTENSIONAPI static const core::Property MaxQueueSize;
-  EXTENSIONAPI static const core::Property SSLContextService;
-  EXTENSIONAPI static const core::Property ClientAuth;
   static auto properties() {
     return std::array{
       Port,
       MaxBatchSize,
       MaxQueueSize,
-      SSLContextService,
-      ClientAuth
     };
   }
 
@@ -60,8 +53,6 @@ class ListenTCP : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index fa22d9096..21b74f1ef 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -61,18 +61,17 @@ void NetworkListenerProcessor::startServer(const ServerOptions& options, utils::
                      max_batch_size_);
 }
 
-void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& context) {
+void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property) {
   gsl_Expects(!server_thread_.joinable() && !server_);
   auto options = readServerOptions(context);
 
   std::string ssl_value;
-  auto& ssl_prop = getSslContextProperty();
-  if (context.getProperty(ssl_prop.getName(), ssl_value) && !ssl_value.empty()) {
-    auto ssl_data = utils::net::getSslData(context, ssl_prop, logger_);
+  if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
+    auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, getClientAuthProperty());
+    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
     server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
   } else {
     server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index c06de77dd..1799a3fcb 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -52,22 +52,23 @@ class NetworkListenerProcessor : public core::Processor {
   }
 
  protected:
+  void startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property);
+  void startUdpServer(const core::ProcessContext& context);
+
+ private:
   struct ServerOptions {
     std::optional<uint64_t> max_queue_size;
     int port = 0;
   };
 
   void stopServer();
-  void startTcpServer(const core::ProcessContext& context);
-  void startUdpServer(const core::ProcessContext& context);
-  ServerOptions readServerOptions(const core::ProcessContext& context);
   void startServer(const ServerOptions& options, utils::net::IpProtocol protocol);
+  ServerOptions readServerOptions(const core::ProcessContext& context);
+
   virtual void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) = 0;
   virtual const core::Property& getMaxBatchSizeProperty() = 0;
   virtual const core::Property& getMaxQueueSizeProperty() = 0;
   virtual const core::Property& getPortProperty() = 0;
-  virtual const core::Property& getSslContextProperty() = 0;
-  virtual const core::Property& getClientAuthProperty() = 0;
 
   uint64_t max_batch_size_{500};
   std::unique_ptr<utils::net::Server> server_;
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 1de8165c0..3c98aec6e 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -249,7 +249,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -301,7 +301,7 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
 }
 
-TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -403,7 +403,7 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
 }
 
 
-TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
@@ -425,7 +425,7 @@ TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") {
   }
 }
 
-TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -479,7 +479,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
   CHECK(controller.trigger().at(ListenSyslog::Success).empty());
 }
 
-TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
+TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index e8c1f9286..9009383b5 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -37,7 +37,7 @@ void check_for_attributes(core::FlowFile& flow_file) {
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
 }
 
-TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
+TEST_CASE("ListenTCP test multiple messages", "[ListenTCP][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
@@ -66,7 +66,7 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
   check_for_attributes(*result.at(ListenTCP::Success)[1]);
 }
 
-TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
+TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
   SingleProcessorTestController controller{listen_tcp};
   LogTestController::getInstance().setTrace<ListenTCP>();
@@ -78,7 +78,7 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
 }
 
-TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
+TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
@@ -111,7 +111,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
   CHECK(controller.trigger().at(ListenTCP::Success).empty());
 }
 
-TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
+TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
 
   SingleProcessorTestController controller{listen_tcp};
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
new file mode 100644
index 000000000..ecce9120d
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
+}
+
+TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  controller.plan->scheduleProcessor(listen_udp);
+  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
+  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));
+  CHECK(result.at(ListenUDP::Success).size() == 2);
+  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1");
+  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message");
+
+  check_for_attributes(*result.at(ListenUDP::Success)[0]);
+  check_for_attributes(*result.at(ListenUDP::Success)[1]);
+}
+
+TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "100"));
+
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
+  REQUIRE_NOTHROW(controller.plan->reset(true));
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
+}
+
+TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
+
+  LogTestController::getInstance().setWarn<ListenUDP>();
+
+  controller.plan->scheduleProcessor(listen_udp);
+  for (auto i = 0; i < 100; ++i) {
+    REQUIRE(utils::sendUdpDatagram({"test_message"}, endpoint));
+  }
+
+  CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).empty());
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 6398d2bc0..f44e0586a 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -413,8 +413,8 @@ TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
   trigger_expect_failure(test_fixture, "message for non-routable server");
 
   CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
-    || LogTestController::getInstance().contains("Operation timed out", 0ms)
-    || LogTestController::getInstance().contains("host has failed to respond", 0ms)));
+      || LogTestController::getInstance().contains("Operation timed out", 0ms)
+      || LogTestController::getInstance().contains("host has failed to respond", 0ms)));
 }
 
 TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
@@ -427,8 +427,7 @@ TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
 
   trigger_expect_failure(test_fixture, "message for invalid-cert server");
 
-  CHECK((LogTestController::getInstance().contains("certificate verify failed", 0ms)
-      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));
+  CHECK(LogTestController::getInstance().matchesRegex("Handshake with .* failed", 0ms));
 }
 
 TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
@@ -441,8 +440,8 @@ TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
 
   trigger_expect_failure(test_fixture, "message for invalid-cert server");
 
-  CHECK((LogTestController::getInstance().contains("sslv3 alert handshake failure", 0ms)
-      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));}
+  CHECK(LogTestController::getInstance().matchesRegex("Handshake with .* failed", 0ms));
+}
 
 TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") {
   PutTCPTestFixture test_fixture;