You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2022/04/19 12:45:35 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1771: Reworked ListenSyslog

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a1d35b2c MINIFICPP-1771: Reworked ListenSyslog
4a1d35b2c is described below

commit 4a1d35b2cc927be464031afbed32509013e0874d
Author: Martin Zink <ma...@apache.org>
AuthorDate: Thu Mar 10 09:27:58 2022 +0100

    MINIFICPP-1771: Reworked ListenSyslog
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1294
---
 LICENSE                                            |  28 ++
 NOTICE                                             |   1 +
 PROCESSORS.md                                      |  57 ++-
 .../CMakeLists.txt => cmake/Asio.cmake             |  27 +-
 .../integration/features/syslog_listener.feature   |  26 ++
 .../minifi/core/SingleNodeDockerCluster.py         |   6 +
 .../minifi/core/SyslogTcpClientContainer.py        |  23 +
 .../minifi/core/SyslogUdpClientContainer.py        |  23 +
 .../integration/minifi/processors/ListenSyslog.py  |  12 +
 docker/test/integration/steps/steps.py             |  10 +
 extensions/gcp/tests/PutGCSObjectTests.cpp         |  40 +-
 extensions/standard-processors/CMakeLists.txt      |   3 +-
 .../processors/ListenSyslog.cpp                    | 511 +++++++++------------
 .../standard-processors/processors/ListenSyslog.h  | 326 ++++++-------
 .../tests/unit/FetchFileTests.cpp                  |  51 +-
 .../tests/unit/ListenSyslogTests.cpp               | 488 ++++++++++++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |  10 +-
 libminifi/include/core/PropertyValidation.h        |  14 +-
 ...ontroller.h => SingleProcessorTestController.h} |  14 +-
 libminifi/test/TestBase.cpp                        |   2 +
 20 files changed, 1118 insertions(+), 554 deletions(-)

diff --git a/LICENSE b/LICENSE
index 8c3a9a21d..836615ad9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3181,6 +3181,34 @@ FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 DEALINGS IN THE SOFTWARE.
 
+--------------------------------------------------------------------------
+
+This project bundles 'asio' under the Boost Software License 1.0
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
 ---------------------------------------------------------------------------
 
 This project reuses test code from TartanLlama/expected from the Public Domain or under CC0
diff --git a/NOTICE b/NOTICE
index ce26b3153..0e66d48fc 100644
--- a/NOTICE
+++ b/NOTICE
@@ -59,6 +59,7 @@ This software includes third party software subject to the following copyrights:
 - date (HowardHinnant/date) - notices below
 - range-v3 - Eric Niebler and other contributors
 - expected-lite - Copyright (C) 2016-2020 Martin Moene.
+- asio - Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
 - TartanLlama/expected - public domain, thanks to Sy Brand
 - libyaml - Copyright (c) 2006-2016 Kirill Simonov, Copyright (c) 2017-2020 Ingy döt Net
 - libwebsockets - Copyright (C) 2010 - 2020 Andy Green <an...@warmcat.com>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 4290a8b25..dbe84270a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1,4 +1,4 @@
-<!--Licensed to the Apache Software Foundation (ASF) under one or morecontributor license agreements.  See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License.  You may obtain a copy of the License at    http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to i [...]
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License.  You may obtain a copy of the License at    http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to  [...]
 # Processors
 
 ## Table of Contents
@@ -1002,27 +1002,52 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description
 
-Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The timestamp can be an RFC5424 timestamp with a format of "yyyy-MM-dd'T'HH:mm:ss.SZ" or "yyyy-MM-dd'T'HH:mm:ss.S+hh:mm", or it can be an RFC3164 timestamp with a format of "MMM d HH:mm:ss". If an incoming me [...]
+Listens for Syslog messages being sent to a given port over TCP or UDP.
+Incoming messages are optionally checked against regular expressions for RFC5424 and RFC3164 formatted messages.
+With parsing enabled the individual parts of the message will be placed as FlowFile attributes and valid messages will be transferred to success relationship, while invalid messages will be transferred to invalid relationship.
+With parsing disabled all message will be routed to the success relationship, but they will only contain the sender, protocol, and port attributes.
+
+
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description |
-| - | - | - | - |
-|Max Batch Size|1||The maximum number of Syslog events to add to a single FlowFile.|
-|Max Number of TCP Connections|2||The maximum number of concurrent connections to accept Syslog messages in TCP mode.|
-|Max Size of Socket Buffer|1 MB||The maximum size of the socket buffer that should be used.|
-|Message Delimiter|\n||Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> core::Property).|
-|Parse Messages|false||Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.|
-|Port|514||The port for Syslog communication|
-|Protocol|UDP|UDP<br>TCP<br>|The protocol for Syslog communication.|
-|Receive Buffer Size|65507 B||The size of each buffer used to receive Syslog messages.|
+| Name                      | Default Value | Allowable Values | Description                                                                                                                                                                                          |
+|---------------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Listening Port            | 514           |                  | The port for Syslog communication. (Well-known ports (0-1023) require root access)                                                                                                                   |
+| Protocol                  | UDP           | UDP<br>TCP<br>   | The protocol for Syslog communication.                                                                                                                                                               |
+| Parse Messages            | false         | false<br>true    | Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.                 |
+| Max Batch Size            | 500           |                  | The maximum number of Syslog events to process at a time.                                                                                                                                            |
+| Max Size of Message Queue | 0             |                  | Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. If the buffer full, the message is ignored. If set to zero the buffer is unlimited. |
+
 ### Relationships
 
-| Name | Description |
-| - | - |
-|invalid|SysLog message format invalid|
-|success|All files are routed to success|
+| Name    | Description                                                                                                                                                                                   |
+|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| invalid | Incoming messages that do not match the expected format when parsing will be sent to this relationship.                                                                                       |
+| success | Incoming messages that match the expected format when parsing will be sent to this relationship. When Parse Messages is set to false, all incoming message will be sent to this relationship. |
+
+
+### Output Attributes
+
+| Attribute                | Description                                                        | Requirements           |
+|--------------------------|--------------------------------------------------------------------|------------------------|
+| _syslog.protocol_        | The protocol over which the Syslog message was received.           | -                      |
+| _syslog.port_            | The port over which the Syslog message was received.               | -                      |
+| _syslog.sender_          | The hostname of the Syslog server that sent the message.           | -                      |
+| _syslog.valid_           | An indicator of whether this message matched the expected formats. | Parsing enabled        |
+| _syslog.priority_        | The priority of the Syslog message.                                | Parsed RFC5424/RFC3164 |
+| _syslog.severity_        | The severity of the Syslog message.                                | Parsed RFC5424/RFC3164 |
+| _syslog.facility_        | The facility of the Syslog message.                                | Parsed RFC5424/RFC3164 |
+| _syslog.timestamp_       | The timestamp of the Syslog message.                               | Parsed RFC5424/RFC3164 |
+| _syslog.hostname_        | The hostname of the Syslog message.                                | Parsed RFC5424/RFC3164 |
+| _syslog.msg_             | The free-form message of the Syslog message.                       | Parsed RFC5424/RFC3164 |
+| _syslog.version_         | The version of the Syslog message.                                 | Parsed RFC5424         |
+| _syslog.app_name_        | The app name of the Syslog message.                                | Parsed RFC5424         |
+| _syslog.proc_id_         | The proc id of the Syslog message.                                 | Parsed RFC5424         |
+| _syslog.msg_id_          | The message id of the Syslog message.                              | Parsed RFC5424         |
+| _syslog.structured_data_ | The structured data of the Syslog message.                         | Parsed RFC5424         |
+
 
 
 ## ListS3
diff --git a/extensions/standard-processors/CMakeLists.txt b/cmake/Asio.cmake
similarity index 60%
copy from extensions/standard-processors/CMakeLists.txt
copy to cmake/Asio.cmake
index 19fe86a92..fd5862eff 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/cmake/Asio.cmake
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,20 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-
-file(GLOB SOURCES  "processors/*.cpp" "controllers/*.cpp" )
-
-add_library(minifi-standard-processors SHARED ${SOURCES})
-
-include(RangeV3)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3)
 
-SET (STANDARD-PROCESSORS minifi-standard-processors PARENT_SCOPE)
-register_extension(minifi-standard-processors)
+include(FetchContent)
 
+FetchContent_Declare(asio
+        URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-22-1.tar.gz
+        URL_HASH SHA256=30cb54a5de5e465d10ec0c2026d6b5917f5e89fffabdbabeb1475846fc9a2cf0)
 
-register_extension_linter(minifi-standard-processors-linter)
+FetchContent_GetProperties(asio)
+if(NOT asio_POPULATED)
+    FetchContent_Populate(asio)
+    add_library(asio INTERFACE)
+    target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)
+    find_package(Threads)
+    target_link_libraries(asio INTERFACE Threads::Threads)
+endif()
diff --git a/docker/test/integration/features/syslog_listener.feature b/docker/test/integration/features/syslog_listener.feature
new file mode 100644
index 000000000..5683728f6
--- /dev/null
+++ b/docker/test/integration/features/syslog_listener.feature
@@ -0,0 +1,26 @@
+Feature: Minifi C++ can act as a syslog listener
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A syslog client can send messages to Minifi over UDP
+    Given a ListenSyslog processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "Protocol" property of the ListenSyslog processor is set to "UDP"
+    And the "Parse Messages" property of the ListenSyslog processor is set to "yes"
+    And a Syslog client with UDP protocol is setup to send logs to minifi
+    And the "success" relationship of the ListenSyslog processor is connected to the PutFile
+
+    When both instances start up
+    Then at least one flowfile is placed in the monitored directory in less than 10 seconds
+
+  Scenario: A syslog client can send messages to Minifi over TCP
+    Given a ListenSyslog processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "Protocol" property of the ListenSyslog processor is set to "TCP"
+    And the "Parse Messages" property of the ListenSyslog processor is set to "yes"
+    And a Syslog client with TCP protocol is setup to send logs to minifi
+    And the "success" relationship of the ListenSyslog processor is connected to the PutFile
+
+    When both instances start up
+    Then at least one flowfile is placed in the monitored directory in less than 10 seconds
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 17dd50bc5..50dc9f3eb 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -33,6 +33,8 @@ from .PostgreSQLServerContainer import PostgreSQLServerContainer
 from .MqttBrokerContainer import MqttBrokerContainer
 from .OPCUAServerContainer import OPCUAServerContainer
 from .SplunkContainer import SplunkContainer
+from .SyslogUdpClientContainer import SyslogUdpClientContainer
+from .SyslogTcpClientContainer import SyslogTcpClientContainer
 from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
 
 
@@ -114,6 +116,10 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, OPCUAServerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'splunk':
             return self.containers.setdefault(name, SplunkContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "syslog-udp-client":
+            return self.containers.setdefault(name, SyslogUdpClientContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "syslog-tcp-client":
+            return self.containers.setdefault(name, SyslogTcpClientContainer(name, self.vols, self.network, self.image_store, command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/core/SyslogTcpClientContainer.py b/docker/test/integration/minifi/core/SyslogTcpClientContainer.py
new file mode 100644
index 000000000..648cc7f66
--- /dev/null
+++ b/docker/test/integration/minifi/core/SyslogTcpClientContainer.py
@@ -0,0 +1,23 @@
+import logging
+from .Container import Container
+
+
+class SyslogTcpClientContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'syslog-tcp-client', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "Syslog TCP client started"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running a Syslog tcp client docker container...')
+        self.client.containers.run(
+            "ubuntu:20.04",
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            entrypoint='/bin/bash -c "echo Syslog TCP client started; while true; do logger --tcp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SyslogUdpClientContainer.py b/docker/test/integration/minifi/core/SyslogUdpClientContainer.py
new file mode 100644
index 000000000..f765dfeda
--- /dev/null
+++ b/docker/test/integration/minifi/core/SyslogUdpClientContainer.py
@@ -0,0 +1,23 @@
+import logging
+from .Container import Container
+
+
+class SyslogUdpClientContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'syslog-udp-client', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "Syslog UDP client started"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running a Syslog udp client docker container...')
+        self.client.containers.run(
+            "ubuntu:20.04",
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            entrypoint='/bin/bash -c "echo Syslog UDP client started; while true; do logger --udp -n minifi-cpp-flow -P 514 sample_log; sleep 1; done"')
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/processors/ListenSyslog.py b/docker/test/integration/minifi/processors/ListenSyslog.py
new file mode 100644
index 000000000..b590ca402
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListenSyslog.py
@@ -0,0 +1,12 @@
+from ..core.Processor import Processor
+
+
+class ListenSyslog(Processor):
+    def __init__(self, schedule=None):
+        properties = {}
+
+        super(ListenSyslog, self).__init__(
+            'ListenSyslog',
+            properties=properties,
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 389241dc8..96cfcf1a4 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -394,6 +394,11 @@ def step_impl(context):
 def step_impl(context):
     context.test.acquire_container("azure-storage-server", "azure-storage-server")
 
+# syslog client
+@given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi')
+def step_impl(context, protocol):
+    client_name = "syslog-" + protocol.lower() + "-client"
+    context.test.acquire_container(client_name, client_name)
 
 # google cloud storage setup
 @given("a Google Cloud storage server is set up with some test data")
@@ -626,6 +631,11 @@ def step_impl(context, num_flowfiles, duration):
     context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration))
 
 
+@then("at least one flowfile is placed in the monitored directory in less than {duration}")
+def step_impl(context, duration):
+    context.test.check_for_num_file_range_generated(1, float('inf'), timeparse(duration))
+
+
 @then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
 def step_impl(context, content, duration):
     context.test.check_for_multiple_files_generated(1, timeparse(duration), [content])
diff --git a/extensions/gcp/tests/PutGCSObjectTests.cpp b/extensions/gcp/tests/PutGCSObjectTests.cpp
index 97773f79a..0c2a3ba50 100644
--- a/extensions/gcp/tests/PutGCSObjectTests.cpp
+++ b/extensions/gcp/tests/PutGCSObjectTests.cpp
@@ -17,7 +17,7 @@
 #include "../processors/PutGCSObject.h"
 #include "GCPAttributes.h"
 #include "core/Resource.h"
-#include "SingleInputTestController.h"
+#include "SingleProcessorTestController.h"
 #include "ProcessContextExpr.h"
 #include "google/cloud/storage/testing/mock_client.h"
 #include "google/cloud/storage/internal/object_metadata_parser.h"
@@ -59,7 +59,7 @@ class PutGCSObjectTests : public ::testing::Test {
                                        "gcp_credentials_controller_service");
   }
   std::shared_ptr<PutGCSObjectMocked> put_gcs_object_ = std::make_shared<PutGCSObjectMocked>("PutGCSObjectMocked");
-  org::apache::nifi::minifi::test::SingleInputTestController test_controller_{put_gcs_object_};
+  org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{put_gcs_object_};
   std::shared_ptr<minifi::core::controller::ControllerServiceNode>  gcp_credentials_node_;
 
   static auto return_upload_in_progress() {
@@ -85,7 +85,8 @@ class PutGCSObjectTests : public ::testing::Test {
 TEST_F(PutGCSObjectTests, MissingBucket) {
   EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession).Times(0);
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), ""));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -106,7 +107,8 @@ TEST_F(PutGCSObjectTests, BucketFromAttribute) {
         return google::cloud::make_status_or(std::unique_ptr<gcs::internal::ResumableUploadSession>(std::move(mock_upload_session)));
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "${gcs.bucket}"));
-  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  test_controller_.enqueueFlowFile("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -125,7 +127,8 @@ TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries.getName(), "2"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -143,7 +146,8 @@ TEST_F(PutGCSObjectTests, ServerGivesPermaError) {
       .WillOnce(return_permanent_error);
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -167,7 +171,8 @@ TEST_F(PutGCSObjectTests, NonRequiredPropertiesAreMissing) {
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
 }
@@ -190,7 +195,8 @@ TEST_F(PutGCSObjectTests, Crc32cMD5LocationTest) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum.getName(), "${crc32c}"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
+  test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
+  const auto& result = test_controller_.trigger();
   EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
 }
@@ -209,7 +215,8 @@ TEST_F(PutGCSObjectTests, DontOverwriteTest) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject.getName(), "false"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
+  test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -229,7 +236,8 @@ TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "ZW5jcnlwdGlvbl9rZXk="));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
@@ -242,7 +250,8 @@ TEST_F(PutGCSObjectTests, InvalidServerSideEncryptionTest) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "not_base64_key"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception);
+  test_controller_.enqueueFlowFile("hello world");
+  EXPECT_THROW(test_controller_.trigger(), minifi::Exception);
 }
 
 TEST_F(PutGCSObjectTests, NoContentType) {
@@ -258,7 +267,8 @@ TEST_F(PutGCSObjectTests, NoContentType) {
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -278,7 +288,8 @@ TEST_F(PutGCSObjectTests, ContentTypeFromAttribute) {
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}});
+  test_controller_.enqueueFlowFile("hello world", {{"mime.type", "text/attribute"}});
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -299,7 +310,8 @@ TEST_F(PutGCSObjectTests, ObjectACLTest) {
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL.getName(), toString(PutGCSObject::PredefinedAcl::AUTHENTICATED_READ)));
-  const auto& result = test_controller_.trigger("hello world");
+  test_controller_.enqueueFlowFile("hello world");
+  const auto& result = test_controller_.trigger();
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index 19fe86a92..f82efa802 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -25,7 +25,8 @@ file(GLOB SOURCES  "processors/*.cpp" "controllers/*.cpp" )
 add_library(minifi-standard-processors SHARED ${SOURCES})
 
 include(RangeV3)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3)
+include(Asio)
+target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
 
 SET (STANDARD-PROCESSORS minifi-standard-processors PARENT_SCOPE)
 register_extension(minifi-standard-processors)
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index aff4ef54a..61316b25c 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -1,7 +1,4 @@
 /**
- * @file ListenSyslog.cpp
- * ListenSyslog class implementation
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,318 +14,268 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer Size")->withDescription("The size of each buffer used to receive Syslog messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket Buffer")->withDescription("The maximum size of the socket buffer that should be used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP Connections")->withDescription("The maximum number of concurrent connections to accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication. (Well-known ports (0-1023) require root access)")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch Size")->withDescription("The maximum number of Syslog events to add to a single FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message Delimiter")->withDescription("Specifies the delimiter to place between Syslog messages when multiple "
-                                                                                "messages are bundled together (see <Max Batch Size> core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a time.")
+        ->withDefaultValue<uint64_t>(500, std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 validator", 1))
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse Messages")->withDescription("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog messages. "
+                          "If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The protocol for Syslog communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
+
+const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship.");
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build());
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                                                    // priority
+    R"((?:(\d{1,2}))\s)"                                                                                      // version
+    R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                                                  // hostname
+    R"((?:([\S]{1,48}))\s)"                                                                                   // app_name
+    R"((?:([\S]{1,128}))\s)"                                                                                  // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                                                   // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                                                              // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                                                             // msg
+
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                                                    // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"                                             // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                                                             // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                                                      // msg
 
 void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  context->getProperty(ParseMessages.getName(), parse_messages_);
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
-}
+  uint64_t max_queue_size = 0;
+  context->getProperty(MaxQueueSize.getName(), max_queue_size);
+  max_queue_size_ = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt;
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+  Protocol protocol;
+  context->getProperty(ProtocolProperty.getName(), protocol);
 
-    if (_serverSocket <= 0) {
-      uint16_t portno = _port;
-      struct sockaddr_in serv_addr;
-      int sockfd;
-      if (_protocol == "TCP")
-        sockfd = socket(AF_INET, SOCK_STREAM, 0);
-      else
-        sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-      if (sockfd < 0) {
-        logger_->log_error("ListenSysLog Server socket creation failed");
-        break;
-      }
-      bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
-      serv_addr.sin_family = AF_INET;
-      serv_addr.sin_addr.s_addr = INADDR_ANY;
-      serv_addr.sin_port = htons(portno);
-      if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
-        logger_->log_error("ListenSysLog Server socket bind failed");
-        break;
-      }
-      if (_protocol == "TCP")
-        listen(sockfd, 5);
-      _serverSocket = sockfd;
-      logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
-    }
-    FD_ZERO(&_readfds);
-    FD_SET(_serverSocket, &_readfds);
-    _maxFds = _serverSocket;
-    std::vector<int>::iterator it;
-    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-      int clientSocket = *it;
-      if (clientSocket >= _maxFds)
-        _maxFds = clientSocket;
-      FD_SET(clientSocket, &_readfds);
-    }
-    fd_set fds;
-    struct timeval tv;
-    int retval;
-    fds = _readfds;
-    tv.tv_sec = 0;
-    // 100 msec
-    tv.tv_usec = 100000;
-    retval = select(_maxFds + 1, &fds, NULL, NULL, &tv);
-    if (retval < 0)
+  int port;
+  context->getProperty(Port.getName(), port);
+
+  if (protocol == Protocol::UDP) {
+    server_ = std::make_unique<UdpServer>(io_context_, queue_, max_queue_size_, port);
+  } else if (protocol == Protocol::TCP) {
+    server_ = std::make_unique<TcpServer>(io_context_, queue_, max_queue_size_, port);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid protocol");
+  }
+
+  server_thread_ = std::thread([this]() { io_context_.run(); });
+  logger_->log_debug("Started %s syslog server on port %d with %s max queue size and %zu max batch size",
+                     protocol.toString(),
+                     port,
+                     max_queue_size ? std::to_string(*max_queue_size_) : "no",
+                     max_batch_size_);
+}
+
+void ListenSyslog::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!queue_.empty() && logs_processed < max_batch_size_) {
+    SyslogMessage received_message;
+    if (!queue_.tryDequeue(received_message))
       break;
-    if (retval == 0)
-      continue;
-    if (FD_ISSET(_serverSocket, &fds)) {
-      // server socket, either we have UDP datagram or TCP connection request
-      if (_protocol == "TCP") {
-        socklen_t clilen;
-        struct sockaddr_in cli_addr;
-        clilen = sizeof(cli_addr);
-        int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen);
-        if (newsockfd > 0) {
-          if (_clientSockets.size() < (uint64_t) _maxConnections) {
-            _clientSockets.push_back(newsockfd);
-            logger_->log_info("ListenSysLog new client socket %d connection", newsockfd);
-            continue;
-          } else {
-            close(newsockfd);
-          }
-        }
-      } else {
-        socklen_t clilen;
-        struct sockaddr_in cli_addr;
-        clilen = sizeof(cli_addr);
-        int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen);
-        if (recvlen > 0 && (uint64_t) (recvlen + getEventQueueByteSize()) <= static_cast<uint64_t>(_recvBufSize)) {
-          uint8_t *payload = new uint8_t[recvlen];
-          memcpy(payload, _buffer, recvlen);
-          putEvent(payload, recvlen);
-        }
-      }
-    }
-    it = _clientSockets.begin();
-    while (it != _clientSockets.end()) {
-      int clientSocket = *it;
-      if (FD_ISSET(clientSocket, &fds)) {
-        int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
-        if (recvlen <= 0) {
-          close(clientSocket);
-          logger_->log_debug("ListenSysLog client socket %d close", clientSocket);
-          it = _clientSockets.erase(it);
-        } else {
-          if ((uint64_t) (recvlen + getEventQueueByteSize()) <= static_cast<uint64_t>(_recvBufSize)) {
-            uint8_t *payload = new uint8_t[recvlen];
-            memcpy(payload, _buffer, recvlen);
-            putEvent(payload, recvlen);
-          }
-          ++it;
-        }
-      }
-    }
+    received_message.transferAsFlowFile(*session, parse_messages_);
+    ++logs_processed;
   }
-  return;
 }
 
-int ListenSyslog::readline(int fd, char *bufptr, size_t len) {
-  char *bufx = bufptr;
-  static char *bp;
-  static int cnt = 0;
-  static char b[2048];
-  char c;
-
-  while (--len > 0) {
-    if (--cnt <= 0) {
-      cnt = recv(fd, b, sizeof(b), 0);
-      if (cnt < 0) {
-        if (errno == EINTR) {
-          len++; /* the while will decrement */
-          continue;
-        }
-        return -1;
-      }
-      if (cnt == 0)
-        return 0;
-      bp = b;
-    }
-    c = *bp++;
-    *bufptr++ = c;
-    if (c == '\n') {
-      *bufptr = '\n';
-      return bufptr - bufx + 1;
+void ListenSyslog::stopServer() {
+  io_context_.stop();
+  if (server_thread_.joinable())
+    server_thread_.join();
+  server_.reset();
+  io_context_.reset();
+  logger_->log_debug("Stopped syslog server");
+}
+
+ListenSyslog::SyslogMessage::SyslogMessage(std::string message, Protocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port)
+    : message_(std::move(message)),
+      protocol_(protocol),
+      server_port_(server_port),
+      sender_address_(std::move(sender_address)) {
+}
+
+void ListenSyslog::SyslogMessage::transferAsFlowFile(core::ProcessSession& session, bool should_parse) {
+  std::shared_ptr<core::FlowFile> flow_file = session.create();
+  bool valid = true;
+  if (should_parse) {
+    std::smatch syslog_match;
+    if (std::regex_search(message_, syslog_match, rfc5424_pattern_)) {
+      uint64_t priority = std::stoull(syslog_match[1]);
+      flow_file->setAttribute("syslog.priority", std::to_string(priority));
+      flow_file->setAttribute("syslog.severity", std::to_string(priority % 8));
+      flow_file->setAttribute("syslog.facility", std::to_string(priority / 8));
+      flow_file->setAttribute("syslog.version", syslog_match[2]);
+      flow_file->setAttribute("syslog.timestamp", syslog_match[3]);
+      flow_file->setAttribute("syslog.hostname", syslog_match[4]);
+      flow_file->setAttribute("syslog.app_name", syslog_match[5]);
+      flow_file->setAttribute("syslog.proc_id", syslog_match[6]);
+      flow_file->setAttribute("syslog.msg_id", syslog_match[7]);
+      flow_file->setAttribute("syslog.structured_data", syslog_match[8]);
+      flow_file->setAttribute("syslog.msg", syslog_match[9]);
+      flow_file->setAttribute("syslog.valid", "true");
+    } else if (std::regex_search(message_, syslog_match, rfc3164_pattern_)) {
+      uint64_t priority = std::stoull(syslog_match[1]);
+      flow_file->setAttribute("syslog.priority", std::to_string(priority));
+      flow_file->setAttribute("syslog.severity", std::to_string(priority % 8));
+      flow_file->setAttribute("syslog.facility", std::to_string(priority / 8));
+      flow_file->setAttribute("syslog.timestamp", syslog_match[2]);
+      flow_file->setAttribute("syslog.hostname", syslog_match[3]);
+      flow_file->setAttribute("syslog.msg", syslog_match[4]);
+      flow_file->setAttribute("syslog.valid", "true");
+    } else {
+      flow_file->setAttribute("syslog.valid", "false");
+      valid = false;
     }
   }
-  return -1;
+
+  session.writeBuffer(flow_file, message_);
+  flow_file->setAttribute("syslog.protocol", protocol_.toString());
+  flow_file->setAttribute("syslog.port", std::to_string(server_port_));
+  flow_file->setAttribute("syslog.sender", sender_address_.to_string());
+  session.transfer(flow_file, valid ? Success : Invalid);
 }
 
-void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::string value;
-  bool needResetServerSocket = false;
-  if (context->getProperty(Protocol.getName(), value)) {
-    if (_protocol != value)
-      needResetServerSocket = true;
-    _protocol = value;
-  }
-  if (context->getProperty(RecvBufSize.getName(), value)) {
-    core::Property::StringToInt(value, _recvBufSize);
-  }
-  if (context->getProperty(MaxSocketBufSize.getName(), value)) {
-    core::Property::StringToInt(value, _maxSocketBufSize);
-  }
-  if (context->getProperty(MaxConnections.getName(), value)) {
-    core::Property::StringToInt(value, _maxConnections);
-  }
-  if (context->getProperty(MessageDelimiter.getName(), value)) {
-    _messageDelimiter = value;
-  }
-  if (context->getProperty(ParseMessages.getName(), value)) {
-    _parseMessages = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
-  }
-  if (context->getProperty(Port.getName(), value)) {
-    int64_t oldPort = _port;
-    core::Property::StringToInt(value, _port);
-    if (_port != oldPort)
-      needResetServerSocket = true;
-  }
-  if (context->getProperty(MaxBatchSize.getName(), value)) {
-    core::Property::StringToInt(value, _maxBatchSize);
-  }
+ListenSyslog::TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size)
+    : concurrent_queue_(concurrent_queue),
+      max_queue_size_(max_queue_size),
+      socket_(io_context) {
+}
 
-  if (needResetServerSocket)
-    _resetServerSocket = true;
+asio::ip::tcp::socket& ListenSyslog::TcpSession::getSocket() {
+  return socket_;
+}
 
-  startSocketThread();
+void ListenSyslog::TcpSession::start() {
+  asio::async_read_until(socket_,
+                         buffer_,
+                         '\n',
+                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
+                           self->handleReadUntilNewLine(error_code);
+                         });
+}
 
-  // read from the event queue
-  if (isEventQueueEmpty()) {
-    context->yield();
+void ListenSyslog::TcpSession::handleReadUntilNewLine(std::error_code error_code) {
+  if (error_code)
     return;
-  }
+  std::istream is(&buffer_);
+  std::string message;
+  std::getline(is, message);
+  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+    concurrent_queue_.enqueue(SyslogMessage(message, Protocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
+  else
+    logger_->log_warn("Queue is full. Syslog message ignored.");
+  asio::async_read_until(socket_,
+                         buffer_,
+                         '\n',
+                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
+                           self->handleReadUntilNewLine(error_code);
+                         });
+}
 
-  std::queue<SysLogEvent> eventQueue;
-  pollEvent(eventQueue, _maxBatchSize);
-  bool firstEvent = true;
-  std::shared_ptr<core::FlowFile> flowFile = NULL;
-  while (!eventQueue.empty()) {
-    SysLogEvent event = eventQueue.front();
-    eventQueue.pop();
-    if (firstEvent) {
-      flowFile = session->create();
-      if (!flowFile)
-        return;
-      ListenSyslog::WriteCallback callback(event.payload, event.len);
-      session->write(flowFile, &callback);
-      delete[] event.payload;
-      firstEvent = false;
-    } else {
-      ListenSyslog::WriteCallback callback(event.payload, event.len);
-      session->append(flowFile, &callback);
-      delete[] event.payload;
-    }
-  }
-  flowFile->addAttribute("syslog.protocol", _protocol);
-  flowFile->addAttribute("syslog.port", std::to_string(_port));
-  session->transfer(flowFile, Success);
+ListenSyslog::TcpServer::TcpServer(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size, uint16_t port)
+    : Server(io_context, concurrent_queue, max_queue_size),
+      acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) {
+  startAccept();
+}
+
+void ListenSyslog::TcpServer::startAccept() {
+  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_);
+  acceptor_.async_accept(new_session->getSocket(),
+                         [this, new_session](const auto& error_code) -> void {
+                           handleAccept(new_session, error_code);
+                         });
+}
+
+void ListenSyslog::TcpServer::handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error) {
+  if (error)
+    return;
+
+  session->start();
+  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_);
+  acceptor_.async_accept(new_session->getSocket(),
+                         [this, new_session](const auto& error_code) -> void {
+                           handleAccept(new_session, error_code);
+                         });
+}
+
+ListenSyslog::UdpServer::UdpServer(asio::io_context& io_context,
+                                   utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
+                                   std::optional<size_t> max_queue_size,
+                                   uint16_t port)
+    : Server(io_context, concurrent_queue, max_queue_size),
+      socket_(io_context, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) {
+  doReceive();
 }
 
-REGISTER_RESOURCE(ListenSyslog, "Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular expressions for RFC5424 and RFC3164 formatted messages. " // NOLINT
-                  "The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The timestamp can be an RFC5424 timestamp with a format of "
-                  "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches "
-                  "one of these patterns, the message will be parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. "
-                  "If an incoming message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message in the content "
-                  "of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the invalid relationship.");
-
-#endif
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+
+void ListenSyslog::UdpServer::doReceive() {
+  buffer_.resize(MAX_UDP_PACKET_SIZE);
+  socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
+                             sender_endpoint_,
+                             [this](std::error_code ec, std::size_t bytes_received) {
+                               if (!ec && bytes_received > 0) {
+                                 buffer_.resize(bytes_received);
+                                 if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+                                   concurrent_queue_.enqueue(SyslogMessage(std::move(buffer_), Protocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
+                                 else
+                                   logger_->log_warn("Queue is full. Syslog message ignored.");
+                               }
+                               doReceive();
+                             });
+}
+
+REGISTER_RESOURCE(ListenSyslog, "Listens for Syslog messages being sent to a given port over TCP or UDP. "
+                                "Incoming messages are optionally checked against regular expressions for RFC5424 and RFC3164 formatted messages. "
+                                "With parsing enabled the individual parts of the message will be placed as FlowFile attributes and "
+                                "valid messages will be transferred to success relationship, while invalid messages will be transferred to invalid relationship. "
+                                "With parsing disabled all message will be routed to the success relationship, but it will only contain the sender, protocol, and port attributes");
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index afdd774e3..d8c92423b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -1,7 +1,4 @@
 /**
- * @file ListenSyslog.h
- * ListenSyslog class declaration
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,217 +14,158 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_LISTENSYSLOG_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_LISTENSYSLOG_H_
-
-#include <stdio.h>
-#include <sys/types.h>
-
-#include <memory>
-#include <queue>
-#include <string>
-#include <vector>
-
-#ifndef WIN32
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/time.h>
 
-#else
-#include <WinSock2.h>
 
-#endif
-#include <errno.h>
+#pragma once
 
-#include <chrono>
-#include <thread>
+#include <utility>
+#include <string>
+#include <memory>
 
-#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
 #include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "FlowFileRecord.h"
-#include "utils/gsl.h"
-
-#ifndef WIN32
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
 
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
 
-// SyslogEvent
-typedef struct {
-  char *payload;
-  uint64_t len;
-} SysLogEvent;
+namespace org::apache::nifi::minifi::processors {
 
-// ListenSyslog Class
 class ListenSyslog : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  ListenSyslog(const std::string& name,  const utils::Identifier& uuid = {}) // NOLINT
-      : Processor(name, uuid) {
-    _eventQueueByteSize = 0;
-    _serverSocket = 0;
-    _recvBufSize = 65507;
-    _maxSocketBufSize = 1024 * 1024;
-    _maxConnections = 2;
-    _maxBatchSize = 1;
-    _messageDelimiter = "\n";
-    _protocol = "UDP";
-    _port = 514;
-    _parseMessages = false;
-    _serverSocket = 0;
-    _maxFds = 0;
-    FD_ZERO(&_readfds);
-    _thread = nullptr;
-    _resetServerSocket = false;
-    _serverTheadRunning = false;
+  explicit ListenSyslog(const std::string& name, const utils::Identifier& uuid = {})
+      : core::Processor(name, uuid) {
   }
-  // Destructor
+  ListenSyslog(const ListenSyslog&) = delete;
+  ListenSyslog(ListenSyslog&&) = delete;
+  ListenSyslog& operator=(const ListenSyslog&) = delete;
+  ListenSyslog& operator=(ListenSyslog&&) = delete;
   ~ListenSyslog() override {
-    _serverTheadRunning = false;
-    delete this->_thread;
-    // need to reset the socket
-    std::vector<int>::iterator it;
-    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-      int clientSocket = *it;
-      close(clientSocket);
-    }
-    _clientSockets.clear();
-    if (_serverSocket > 0) {
-      logger_->log_debug("ListenSysLog Server socket %d close", _serverSocket);
-      close(_serverSocket);
-      _serverSocket = 0;
-    }
+    stopServer();
   }
-  // Processor Name
-  static constexpr char const *ProcessorName = "ListenSyslog";
-  // Supported Properties
-  static core::Property RecvBufSize;
-  static core::Property MaxSocketBufSize;
-  static core::Property MaxConnections;
-  static core::Property MaxBatchSize;
-  static core::Property MessageDelimiter;
-  static core::Property ParseMessages;
-  static core::Property Protocol;
-  static core::Property Port;
-  // Supported Relationships
-  static core::Relationship Success;
-  static core::Relationship Invalid;
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(char *data, uint64_t size)
-        : _data(reinterpret_cast<uint8_t*>(data)),
-          _dataSize(size) {
-    }
-    uint8_t *_data;
-    uint64_t _dataSize;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (!_data || _dataSize <= 0) return 0;
-      const auto write_ret = stream->write(_data, _dataSize);
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
 
- public:
-  // OnTrigger method, implemented by NiFi ListenSyslog
-  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
-  // Initialize, over write by NiFi ListenSyslog
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property ProtocolProperty;
+  EXTENSIONAPI static const core::Property MaxBatchSize;
+  EXTENSIONAPI static const core::Property ParseMessages;
+  EXTENSIONAPI static const core::Property MaxQueueSize;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Invalid;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
   void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
 
- private:
-  core::annotation::Input getInputRequirement() const override {
-    return core::annotation::Input::INPUT_FORBIDDEN;
+  bool isSingleThreaded() const override {
+    return false;
   }
 
-  // Logger
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
-  // Run function for the thread
-  static void run(ListenSyslog *process);
-  // Run Thread
-  void runThread();
-  // Queue for store syslog event
-  std::queue<SysLogEvent> _eventQueue;
-  // Size of Event queue in bytes
-  uint64_t _eventQueueByteSize;
-  // Get event queue size
-  uint64_t getEventQueueSize() {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return _eventQueue.size();
-  }
-  // Get event queue byte size
-  uint64_t getEventQueueByteSize() {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return _eventQueueByteSize;
+  void notifyStop() override {
+    stopServer();
   }
-  // Whether the event queue  is empty
-  bool isEventQueueEmpty() {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return _eventQueue.empty();
-  }
-  // Put event into directory listing
-  void putEvent(uint8_t *payload, uint64_t len) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    SysLogEvent event;
-    event.payload = reinterpret_cast<char*>(payload);
-    event.len = len;
-    _eventQueue.push(event);
-    _eventQueueByteSize += len;
-  }
-  // Read \n terminated line from TCP socket
-  int readline(int fd, char *bufptr, size_t len);
-  // start server socket and handling client socket
-  void startSocketThread();
-  // Poll event
-  void pollEvent(std::queue<SysLogEvent> &list, int maxSize) {
-    std::lock_guard<std::mutex> lock(mutex_);
-
-    while (!_eventQueue.empty() && (maxSize == 0 || list.size() < static_cast<uint32_t>(maxSize))) {
-      SysLogEvent event = _eventQueue.front();
-      _eventQueue.pop();
-      _eventQueueByteSize -= event.len;
-      list.push(event);
-    }
-    return;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
   }
-  // Mutex for protection of the directory listing
-  std::mutex mutex_;
-  int64_t _recvBufSize;
-  int64_t _maxSocketBufSize;
-  int64_t _maxConnections;
-  int64_t _maxBatchSize;
-  std::string _messageDelimiter;
-  std::string _protocol;
-  int64_t _port;
-  bool _parseMessages;
-  int _serverSocket;
-  std::vector<int> _clientSockets;
-  int _maxFds;
-  fd_set _readfds;
-  // thread
-  std::thread *_thread;
-  // whether to reset the server socket
-  bool _resetServerSocket; bool _serverTheadRunning;
-  // buffer for read socket
-  char _buffer[2048];
-};
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+ private:
+  SMART_ENUM(Protocol,
+             (TCP, "TCP"),
+             (UDP, "UDP")
+  )
+
+  void stopServer();
+
+  class SyslogMessage {
+   public:
+    SyslogMessage() = default;
+    SyslogMessage(std::string message, Protocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port);
+
+    void transferAsFlowFile(core::ProcessSession& session, bool should_parse);
 
-#endif
+   private:
+    std::string message_;
+    Protocol protocol_;
+    asio::ip::port_type server_port_{514};
+    asio::ip::address sender_address_;
+
+    static const std::regex rfc5424_pattern_;
+    static const std::regex rfc3164_pattern_;
+  };
+
+  class Server {
+   public:
+    virtual ~Server() = default;
 
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_LISTENSYSLOG_H_
+   protected:
+    Server(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size)
+        : concurrent_queue_(concurrent_queue), io_context_(io_context), max_queue_size_(max_queue_size) {}
+
+    utils::ConcurrentQueue<SyslogMessage>& concurrent_queue_;
+    asio::io_context& io_context_;
+    std::optional<size_t> max_queue_size_;
+  };
+
+  class TcpSession : public std::enable_shared_from_this<TcpSession> {
+   public:
+    TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size);
+
+    asio::ip::tcp::socket& getSocket();
+    void start();
+    void handleReadUntilNewLine(std::error_code error_code);
+
+   private:
+    utils::ConcurrentQueue<SyslogMessage>& concurrent_queue_;
+    std::optional<size_t> max_queue_size_;
+    asio::ip::tcp::socket socket_;
+    asio::basic_streambuf<std::allocator<char>> buffer_;
+    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
+  };
+
+  class TcpServer : public Server {
+   public:
+    TcpServer(asio::io_context& io_context,
+              utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
+              std::optional<size_t> max_queue_size,
+              uint16_t port);
+
+   private:
+    void startAccept();
+    void handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error);
+
+    asio::ip::tcp::acceptor acceptor_;
+  };
+
+  class UdpServer : public Server {
+   public:
+    UdpServer(asio::io_context& io_context,
+              utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
+              std::optional<size_t> max_queue_size,
+              uint16_t port);
+
+   private:
+    void doReceive();
+
+    asio::ip::udp::socket socket_;
+    asio::ip::udp::endpoint sender_endpoint_;
+    std::string buffer_;
+
+    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
+    static inline constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+  };
+
+  utils::ConcurrentQueue<SyslogMessage> queue_;
+  std::unique_ptr<Server> server_;
+  asio::io_context io_context_;
+  std::thread server_thread_;
+
+  uint64_t max_batch_size_ = 500;
+  std::optional<uint64_t> max_queue_size_;
+  bool parse_messages_ = false;
+
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
+};
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
index 853ecf369..a6cd1d061 100644
--- a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -28,7 +28,7 @@
 #include "processors/FetchFile.h"
 #include "utils/TestUtils.h"
 #include "utils/IntegrationTestUtils.h"
-#include "SingleInputTestController.h"
+#include "SingleProcessorTestController.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -43,7 +43,7 @@ class FetchFileTestFixture {
   std::unordered_multiset<std::string> getDirContents(const std::string& dir_path) const;
 
   std::shared_ptr<minifi::processors::FetchFile> fetch_file_processor_;
-  std::shared_ptr<minifi::test::SingleInputTestController> test_controller_;
+  std::shared_ptr<minifi::test::SingleProcessorTestController> test_controller_;
   const std::string input_dir_;
   const std::string permission_denied_file_name_;
   const std::string input_file_name_;
@@ -53,7 +53,7 @@ class FetchFileTestFixture {
 
 FetchFileTestFixture::FetchFileTestFixture()
   : fetch_file_processor_(std::make_shared<minifi::processors::FetchFile>("FetchFile")),
-    test_controller_(std::make_shared<minifi::test::SingleInputTestController>(fetch_file_processor_)),
+    test_controller_(std::make_shared<minifi::test::SingleProcessorTestController>(fetch_file_processor_)),
     input_dir_(test_controller_->createTempDirectory()),
     permission_denied_file_name_("permission_denied.txt"),
     input_file_name_("test.txt"),
@@ -87,7 +87,8 @@ std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
   attributes_["filename"] = "non_existent.file";
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -98,7 +99,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-
 TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -111,7 +113,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testF
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
     input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -121,7 +124,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testF
 #endif
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -133,7 +137,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path",
   utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
   auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -142,7 +147,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path",
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
-  REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
+  test_controller_->enqueueFlowFile("", attributes_);
+  REQUIRE_THROWS_AS(test_controller_->trigger(), minifi::Exception);
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
@@ -151,7 +157,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[test
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Failure);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -166,7 +173,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored whe
   utils::putFileToDir(move_dir, input_file_name_, "old content");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -178,7 +186,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -194,7 +203,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -211,7 +221,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -225,7 +236,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory
   auto move_dir = test_controller_->createTempDirectory();
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -240,7 +252,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file i
   move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -256,7 +269,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to
   utils::file::FileUtils::set_permissions(move_dir, 0);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -269,7 +283,8 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
-  const auto result = test_controller_->trigger("", attributes_);
+  test_controller_->enqueueFlowFile("", attributes_);
+  const auto result = test_controller_->trigger();
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
new file mode 100644
index 000000000..27b38551f
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "Catch.h"
+#include "ListenSyslog.h"
+#include "SingleProcessorTestController.h"
+#include "asio.hpp"
+
+using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors::testing {
+
+constexpr uint64_t SYSLOG_PORT = 10255;
+
+struct ValidRFC5424Message {
+  constexpr ValidRFC5424Message(std::string_view message,
+                                std::string_view priority,
+                                std::string_view severity,
+                                std::string_view facility,
+                                std::string_view version,
+                                std::string_view timestamp,
+                                std::string_view hostname,
+                                std::string_view app_name,
+                                std::string_view proc_id,
+                                std::string_view msg_id,
+                                std::string_view structured_data,
+                                std::string_view msg)
+      : unparsed_(message),
+        priority_(priority),
+        severity_(severity),
+        facility_(facility),
+        version_(version),
+        timestamp_(timestamp),
+        hostname_(hostname),
+        app_name_(app_name),
+        proc_id_(proc_id),
+        msg_id_(msg_id),
+        structured_data_(structured_data),
+        msg_(msg) {}
+
+  const std::string_view unparsed_;
+  const std::string_view priority_;
+  const std::string_view severity_;
+  const std::string_view facility_;
+  const std::string_view version_;
+  const std::string_view timestamp_;
+  const std::string_view hostname_;
+  const std::string_view app_name_;
+  const std::string_view proc_id_;
+  const std::string_view msg_id_;
+  const std::string_view structured_data_;
+  const std::string_view msg_;
+};
+
+struct ValidRFC3164Message {
+  constexpr ValidRFC3164Message(std::string_view message,
+                                std::string_view priority,
+                                std::string_view severity,
+                                std::string_view facility,
+                                std::string_view timestamp,
+                                std::string_view hostname,
+                                std::string_view msg)
+      : unparsed_(message),
+        priority_(priority),
+        severity_(severity),
+        facility_(facility),
+        timestamp_(timestamp),
+        hostname_(hostname),
+        msg_(msg) {}
+
+  const std::string_view unparsed_;
+  const std::string_view priority_;
+  const std::string_view severity_;
+  const std::string_view facility_;
+  const std::string_view timestamp_;
+  const std::string_view hostname_;
+  const std::string_view msg_;
+};
+
+// These examples are from the Syslog Protocol RFC5424 documentation
+// https://datatracker.ietf.org/doc/html/rfc5424#section-6.5
+
+constexpr ValidRFC5424Message rfc5424_doc_example_1 = ValidRFC5424Message(
+    R"(<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - 'su root' failed for lonvick on /dev/pts/8)",
+    "34",
+    "2",
+    "4",
+    "1",
+    "2003-10-11T22:14:15.003Z",
+    "mymachine.example.com",
+    "su",
+    "-",
+    "ID47",
+    "-",
+    "'su root' failed for lonvick on /dev/pts/8");
+
+constexpr ValidRFC5424Message rfc5424_doc_example_2 = ValidRFC5424Message(
+    R"(<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts.)",
+    "165",
+    "5",
+    "20",
+    "1",
+    "2003-08-24T05:14:15.000003-07:00",
+    "192.0.2.1",
+    "myproc",
+    "8710",
+    "-",
+    "-",
+    "%% It's time to make the do-nuts.");
+
+constexpr ValidRFC5424Message rfc5424_doc_example_3 = ValidRFC5424Message(
+    R"(<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] An application event log entry...)",
+    "165",
+    "5",
+    "20",
+    "1",
+    "2003-10-11T22:14:15.003Z",
+    "mymachine.example.com",
+    "evntslog",
+    "-",
+    "ID47",
+    R"([exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"])",
+    "An application event log entry...");
+
+constexpr ValidRFC5424Message rfc5424_doc_example_4 = ValidRFC5424Message(
+    R"(<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473class="high"])",
+    "165",
+    "5",
+    "20",
+    "1",
+    "2003-10-11T22:14:15.003Z",
+    "mymachine.example.com",
+    "evntslog",
+    "-",
+    "ID47",
+    R"([exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473class="high"])",
+    "");
+
+// These examples are from the Syslog Protocol documentation
+// https://datatracker.ietf.org/doc/html/rfc3164#section-5.4
+constexpr ValidRFC3164Message rfc3164_doc_example_1 = ValidRFC3164Message(
+    R"(<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8)",
+    "34",
+    "2",
+    "4",
+    "Oct 11 22:14:15",
+    "mymachine",
+    "su: 'su root' failed for lonvick on /dev/pts/8");
+
+constexpr ValidRFC3164Message rfc3164_doc_example_2 = ValidRFC3164Message(
+    R"(<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!)",
+    "13",
+    "5",
+    "1",
+    "Feb 5 17:32:18",
+    "10.0.0.99",
+    "Use the BFG!");
+
+constexpr ValidRFC3164Message rfc3164_doc_example_3 = ValidRFC3164Message(
+    R"(<165>Aug 24 05:34:00 mymachine myproc[10]: %% It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: Conveyer1=OK, Conveyer2=OK # %%)",
+    "165",
+    "5",
+    "20",
+    "Aug 24 05:34:00",
+    "mymachine",
+    R"(myproc[10]: %% It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: Conveyer1=OK, Conveyer2=OK # %%)");
+
+constexpr ValidRFC3164Message rfc3164_doc_example_4 = ValidRFC3164Message(
+    R"(<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!)",
+    "0",
+    "0",
+    "0",
+    "Oct 22 10:52:12",
+    "scapegoat",
+    R"(1990 Oct 22 10:52:01 TZ-6 scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!)");
+
+// These examples are generated by https://man7.org/linux/man-pages/man1/logger.1.html
+constexpr std::string_view rfc5424_logger_example_1 = R"(<13>1 2022-03-17T10:10:42.846095+01:00 host-name user - - [timeQuality tzKnown="1" isSynced="0"] log_line")";
+
+constexpr std::string_view invalid_syslog = "not syslog";
+
+void sendUDPPacket(const std::string_view content, uint64_t port) {
+  asio::io_context io_context;
+  asio::ip::udp::socket socket(io_context);
+  asio::ip::udp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
+  socket.open(asio::ip::udp::v4());
+  std::error_code err;
+  socket.send_to(asio::buffer(content, content.size()), remote_endpoint, 0, err);
+  REQUIRE(!err);
+  socket.close();
+}
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
+
+void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, std::string_view protocol) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
+  CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.valid"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.priority"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.version"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.timestamp"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.hostname"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.app_name"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.proc_id"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.msg_id"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.structured_data"));
+  CHECK(std::nullopt == flow_file.getAttribute("syslog.msg"));
+}
+
+void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424Message& original_message, uint16_t port, std::string_view protocol) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
+  CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+
+  CHECK("true" == flow_file.getAttribute("syslog.valid"));
+  CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
+  CHECK(original_message.facility_ == flow_file.getAttribute("syslog.facility"));
+  CHECK(original_message.severity_ == flow_file.getAttribute("syslog.severity"));
+  CHECK(original_message.version_ == flow_file.getAttribute("syslog.version"));
+  CHECK(original_message.timestamp_ == flow_file.getAttribute("syslog.timestamp"));
+  CHECK(original_message.hostname_ == flow_file.getAttribute("syslog.hostname"));
+  CHECK(original_message.app_name_ == flow_file.getAttribute("syslog.app_name"));
+  CHECK(original_message.proc_id_ == flow_file.getAttribute("syslog.proc_id"));
+  CHECK(original_message.msg_id_ == flow_file.getAttribute("syslog.msg_id"));
+  CHECK(original_message.structured_data_ == flow_file.getAttribute("syslog.structured_data"));
+  CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
+}
+
+void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164Message& original_message, uint16_t port, std::string_view protocol) {
+  CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
+  CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender"));
+
+  CHECK("true" == flow_file.getAttribute("syslog.valid"));
+  CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority"));
+  CHECK(original_message.facility_ == flow_file.getAttribute("syslog.facility"));
+  CHECK(original_message.severity_ == flow_file.getAttribute("syslog.severity"));
+  CHECK(original_message.timestamp_ == flow_file.getAttribute("syslog.timestamp"));
+  CHECK(original_message.hostname_ == flow_file.getAttribute("syslog.hostname"));
+  CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
+}
+
+bool triggerUntil(test::SingleProcessorTestController& controller,
+                  const std::unordered_map<core::Relationship, size_t>& expected_quantities,
+                  std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>& result,
+                  const std::chrono::milliseconds max_duration,
+                  const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    for (auto& [relationship, flow_files] : controller.trigger()) {
+      result[relationship].insert(result[relationship].end(), flow_files.begin(), flow_files.end());
+    }
+    bool expected_quantities_met = true;
+    for (const auto& [relationship, expected_quantity] : expected_quantities) {
+      if (result[relationship].size() < expected_quantity) {
+        expected_quantities_met = false;
+        break;
+      }
+    }
+    if (expected_quantities_met)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+bool countLogOccurrencesUntil(const std::string& pattern,
+                              const int occurrences,
+                              const std::chrono::milliseconds max_duration,
+                              const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    if (LogTestController::getInstance().countOccurrences(pattern) == occurrences)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
+  const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
+
+  test::SingleProcessorTestController controller{listen_syslog};
+  LogTestController::getInstance().setTrace<ListenSyslog>();
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
+  std::string protocol;
+
+  SECTION("UDP") {
+    protocol = "UDP";
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
+    sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+  }
+
+  SECTION("TCP") {
+    protocol = "TCP";
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT);
+    sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT);
+  }
+  std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
+  REQUIRE(triggerUntil(controller, {{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
+  CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
+  CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
+
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[0], SYSLOG_PORT, protocol);
+  check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
+}
+
+TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
+  const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
+
+  test::SingleProcessorTestController controller{listen_syslog};
+  LogTestController::getInstance().setTrace<ListenSyslog>();
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
+
+  std::string protocol;
+  SECTION("UDP") {
+    protocol = "UDP";
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    std::this_thread::sleep_for(100ms);
+    sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc5424_doc_example_2.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc5424_doc_example_3.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc5424_doc_example_4.unparsed_, SYSLOG_PORT);
+
+    sendUDPPacket(rfc3164_doc_example_1.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc3164_doc_example_2.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc3164_doc_example_3.unparsed_, SYSLOG_PORT);
+    sendUDPPacket(rfc3164_doc_example_4.unparsed_, SYSLOG_PORT);
+
+    sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT);
+    sendUDPPacket(invalid_syslog, SYSLOG_PORT);
+  }
+
+  SECTION("TCP") {
+    protocol = "TCP";
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    std::this_thread::sleep_for(100ms);
+    sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
+                        rfc5424_doc_example_2.unparsed_,
+                        rfc5424_doc_example_3.unparsed_,
+                        rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT);
+
+    sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
+                        rfc3164_doc_example_2.unparsed_,
+                        rfc3164_doc_example_3.unparsed_,
+                        rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT);
+
+
+    sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT);
+    sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT);
+  }
+
+  std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
+  REQUIRE(triggerUntil(controller, {{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
+  REQUIRE(result.at(ListenSyslog::Success).size() == 9);
+  REQUIRE(result.at(ListenSyslog::Invalid).size() == 1);
+
+  std::unordered_map<std::string, core::FlowFile&> success_flow_files;
+
+  for (auto& flow_file : result.at(ListenSyslog::Success)) {
+    success_flow_files.insert({controller.plan->getContent(flow_file), *flow_file});
+  }
+
+  REQUIRE(success_flow_files.contains(std::string(rfc5424_doc_example_1.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc5424_doc_example_2.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc5424_doc_example_3.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc5424_doc_example_4.unparsed_)));
+
+  REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_1.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_2.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_3.unparsed_)));
+  REQUIRE(success_flow_files.contains(std::string(rfc3164_doc_example_4.unparsed_)));
+
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_1.unparsed_)), rfc5424_doc_example_1, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_2.unparsed_)), rfc5424_doc_example_2, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_3.unparsed_)), rfc5424_doc_example_3, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc5424_doc_example_4.unparsed_)), rfc5424_doc_example_4, SYSLOG_PORT, protocol);
+
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_1.unparsed_)), rfc3164_doc_example_1, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_2.unparsed_)), rfc3164_doc_example_2, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_3.unparsed_)), rfc3164_doc_example_3, SYSLOG_PORT, protocol);
+  check_parsed_attributes(success_flow_files.at(std::string(rfc3164_doc_example_4.unparsed_)), rfc3164_doc_example_4, SYSLOG_PORT, protocol);
+
+  REQUIRE(success_flow_files.contains(std::string(rfc5424_logger_example_1)));
+  CHECK(controller.plan->getContent(result.at(ListenSyslog::Invalid)[0]) == invalid_syslog);
+}
+
+
+TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") {
+  const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
+  test::SingleProcessorTestController controller{listen_syslog};
+  LogTestController::getInstance().setTrace<ListenSyslog>();
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "100"));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "true"));
+  SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_syslog));
+    REQUIRE_NOTHROW(controller.plan->reset(true));
+    REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_syslog));
+  }
+
+  SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_syslog));
+    REQUIRE_NOTHROW(controller.plan->reset(true));
+    REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_syslog));
+  }
+}
+
+TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
+  const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
+
+  test::SingleProcessorTestController controller{listen_syslog};
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "10"));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::ParseMessages, "false"));
+  REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxQueueSize, "50"));
+
+  LogTestController::getInstance().setWarn<ListenSyslog>();
+
+  SECTION("UDP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    for (auto i = 0; i < 100; ++i) {
+      sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
+    }
+  }
+
+  SECTION("TCP") {
+    REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP"));
+    controller.plan->scheduleProcessor(listen_syslog);
+    for (auto i = 0; i < 100; ++i) {
+      sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT);
+    }
+  }
+  CHECK(countLogOccurrencesUntil("Queue is full. Syslog message ignored.", 50, 300ms, 50ms));
+  CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenSyslog::Success).empty());
+}
+
+}  // namespace org::apache::nifi::minifi::processors::testing
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 52defa300..6095d72bd 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -20,7 +20,7 @@
 #include <new>
 #include <random>
 #include <string>
-#include "SingleInputTestController.h"
+#include "SingleProcessorTestController.h"
 #include "Catch.h"
 #include "PutUDP.h"
 #include "core/ProcessContext.h"
@@ -77,7 +77,7 @@ TEST_CASE("PutUDP", "[putudp]") {
   const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
   const auto port_str = std::to_string(port);
 
-  test::SingleInputTestController controller{putudp};
+  test::SingleProcessorTestController controller{putudp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
@@ -88,7 +88,8 @@ TEST_CASE("PutUDP", "[putudp]") {
 
   {
     const char* const message = "first message: hello";
-    const auto result = controller.trigger(message);
+    controller.enqueueFlowFile(message);
+    const auto result = controller.trigger();
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
     REQUIRE(result.at(PutUDP::Failure).empty());
@@ -100,7 +101,8 @@ TEST_CASE("PutUDP", "[putudp]") {
 
   {
     const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA [...]
-    const auto result = controller.trigger(message);
+    controller.enqueueFlowFile(message);
+    const auto result = controller.trigger();
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
     REQUIRE(result.at(PutUDP::Failure).empty());
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index 68b9eb6cf..66ae49314 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -236,8 +236,10 @@ class LongValidator : public PropertyValidator {
 
 class UnsignedLongValidator : public PropertyValidator {
  public:
-  explicit UnsignedLongValidator(const std::string &name)
-      : PropertyValidator(name) {
+  explicit UnsignedLongValidator(const std::string &name, uint64_t min = std::numeric_limits<uint64_t>::min(), uint64_t max = std::numeric_limits<uint64_t>::max())
+      : PropertyValidator(name),
+        min_(min),
+        max_(max) {
   }
   ~UnsignedLongValidator() override = default;
 
@@ -251,12 +253,16 @@ class UnsignedLongValidator : public PropertyValidator {
       if (negative) {
         throw std::out_of_range("non negative expected");
       }
-      std::stoull(input);
-      return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build();
+      auto res = std::stoull(input);
+      return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(res >= min_ && res <= max_).build();
     } catch (...) {
     }
     return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(false).build();
   }
+
+ private:
+  uint64_t min_;
+  uint64_t max_;
 };
 
 class NonBlankValidator : public PropertyValidator {
diff --git a/libminifi/test/SingleInputTestController.h b/libminifi/test/SingleProcessorTestController.h
similarity index 91%
rename from libminifi/test/SingleInputTestController.h
rename to libminifi/test/SingleProcessorTestController.h
index 98380c142..1e4707cf4 100644
--- a/libminifi/test/SingleInputTestController.h
+++ b/libminifi/test/SingleProcessorTestController.h
@@ -28,16 +28,13 @@
 #include "core/Processor.h"
 
 namespace org::apache::nifi::minifi::test {
-class SingleInputTestController : public TestController {
+class SingleProcessorTestController : public TestController {
  public:
-  explicit SingleInputTestController(const std::shared_ptr<core::Processor>& processor)
+  explicit SingleProcessorTestController(const std::shared_ptr<core::Processor>& processor)
       : processor_{plan->addProcessor(processor, processor->getName())}
   {}
 
-  std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>
-  trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
-    const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
-    input_->put(new_flow_file);
+  auto trigger() {
     plan->runProcessor(processor_);
     std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
     for (const auto& [relationship, connection]: outgoing_connections_) {
@@ -54,6 +51,11 @@ class SingleInputTestController : public TestController {
     return result;
   }
 
+  void enqueueFlowFile(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+    const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
+    input_->put(new_flow_file);
+  }
+
   core::Relationship addDynamicRelationship(std::string name) {
     auto relationship = core::Relationship{std::move(name), ""};
     outgoing_connections_.insert_or_assign(relationship, plan->addConnection(processor_, relationship, nullptr));
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 620a69484..efa20c362 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -392,6 +392,8 @@ void TestPlan::reset(bool reschedule) {
     while (proc->getActiveTasks() > 0) {
       proc->decrementActiveTask();
     }
+    if (reschedule)
+      proc->onUnSchedule();
   }
 }