You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/06/27 15:11:05 UTC

[nifi-minifi-cpp] branch main updated (19deb1e81 -> 77ec00b67)

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 19deb1e81 MINIFICPP-1868 Exclude deleted files from clang-tidy checks
     new 3425bcbeb MINIFICPP-1857 Create ListenTCP class
     new f1e1b4866 MINIFICPP-1870 Replace IgnoreCaptureGroupZero with IncludeCaptureGroupZero
     new 77ec00b67 MINIFICPP-1873 - Fix state management path config typo in minifi.properties file

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CONFIGURE.md                                       |   4 +-
 PROCESSORS.md                                      |  21 ++-
 README.md                                          |   2 +-
 conf/minifi.properties                             |   2 +-
 .../integration/features/network_listener.feature  |  14 ++
 docker/test/integration/minifi/core/ImageStore.py  |  11 ++
 .../minifi/core/SingleNodeDockerCluster.py         |   3 +
 .../integration/minifi/core/TcpClientContainer.py  |  23 +++
 .../processors/{ListenSyslog.py => ListenTCP.py}   |   6 +-
 docker/test/integration/steps/steps.py             |   6 +
 .../standard-processors/processors/ExtractText.cpp |  22 +--
 .../standard-processors/processors/ExtractText.h   |  21 +--
 .../processors/ListenSyslog.cpp                    | 175 ++-------------------
 .../standard-processors/processors/ListenSyslog.h  | 126 +--------------
 .../standard-processors/processors/ListenTCP.cpp   |  68 ++++++++
 .../processors/ListenTCP.h}                        |  46 +++---
 .../processors/NetworkListenerProcessor.cpp        |  79 ++++++++++
 .../processors/NetworkListenerProcessor.h}         |  49 +++---
 .../standard-processors/processors/PutUDP.cpp      |   2 +-
 .../tests/unit/ExtractTextTests.cpp                |  38 +++--
 .../tests/unit/ListenSyslogTests.cpp               |  61 +------
 .../tests/unit/ListenTcpTests.cpp                  |  93 +++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |   2 +-
 libminifi/CMakeLists.txt                           |   3 +-
 libminifi/include/core/PropertyValidation.h        |   8 +-
 libminifi/include/utils/net/DNS.h                  |  14 +-
 .../include/utils/net/IpProtocol.h                 |  20 +--
 libminifi/include/utils/net/Server.h               |  81 ++++++++++
 libminifi/include/utils/net/TcpServer.h            |  62 ++++++++
 .../include/utils/net/UdpServer.h                  |  29 ++--
 libminifi/src/io/ClientSocket.cpp                  |  14 +-
 libminifi/src/utils/net/DNS.cpp                    |   8 +-
 libminifi/src/utils/net/TcpServer.cpp              |  85 ++++++++++
 libminifi/src/utils/net/UdpServer.cpp              |  46 ++++++
 libminifi/test/SingleProcessorTestController.h     |  26 +++
 libminifi/test/Utils.h                             |  31 ++++
 libminifi/test/unit/PropertyValidationTests.cpp    |  10 ++
 37 files changed, 823 insertions(+), 488 deletions(-)
 create mode 100644 docker/test/integration/features/network_listener.feature
 create mode 100644 docker/test/integration/minifi/core/TcpClientContainer.py
 copy docker/test/integration/minifi/processors/{ListenSyslog.py => ListenTCP.py} (67%)
 create mode 100644 extensions/standard-processors/processors/ListenTCP.cpp
 copy extensions/{mqtt/processors/ConvertBase.h => standard-processors/processors/ListenTCP.h} (52%)
 create mode 100644 extensions/standard-processors/processors/NetworkListenerProcessor.cpp
 copy extensions/{bustache/ApplyTemplate.h => standard-processors/processors/NetworkListenerProcessor.h} (53%)
 create mode 100644 extensions/standard-processors/tests/unit/ListenTcpTests.cpp
 copy extensions/http-curl/tests/EmptyFlow.h => libminifi/include/utils/net/IpProtocol.h (81%)
 create mode 100644 libminifi/include/utils/net/Server.h
 create mode 100644 libminifi/include/utils/net/TcpServer.h
 copy extensions/azure/utils/AzureSdkLogger.h => libminifi/include/utils/net/UdpServer.h (59%)
 create mode 100644 libminifi/src/utils/net/TcpServer.cpp
 create mode 100644 libminifi/src/utils/net/UdpServer.cpp


[nifi-minifi-cpp] 02/03: MINIFICPP-1870 Replace IgnoreCaptureGroupZero with IncludeCaptureGroupZero

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f1e1b486663d4f6de03dc8aa79670b8da17f7eb3
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Jun 27 16:18:21 2022 +0200

    MINIFICPP-1870 Replace IgnoreCaptureGroupZero with IncludeCaptureGroupZero
    
    Closes #1359
    
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 .../standard-processors/processors/ExtractText.cpp | 22 +++----------
 .../standard-processors/processors/ExtractText.h   | 21 +++---------
 .../tests/unit/ExtractTextTests.cpp                | 38 +++++++++++++++-------
 3 files changed, 37 insertions(+), 44 deletions(-)

diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 9a0d86661..dc7be63e8 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -22,7 +22,6 @@
 #include <string>
 #include <memory>
 #include <map>
-#include <iostream>
 #include <sstream>
 #include <utility>
 
@@ -35,11 +34,7 @@
 #include "utils/gsl.h"
 #include "utils/RegexUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 constexpr size_t MAX_BUFFER_SIZE = 4096;
 constexpr int MAX_CAPTURE_GROUP_SIZE = 1024;
@@ -57,7 +52,7 @@ core::Property ExtractText::RegexMode(
     ->withDescription("Set this to extract parts of flowfile content using regular experssions in dynamic properties")
     ->withDefaultValue<bool>(false)->build());
 
-core::Property ExtractText::IgnoreCaptureGroupZero(
+core::Property ExtractText::IncludeCaptureGroupZero(
     core::PropertyBuilder::createProperty("Include Capture Group 0")
     ->withDescription("Indicates that Capture Group 0 should be included as an attribute. "
                       "Capture Group 0 represents the entirety of the regular expression match, is typically not used, and could have considerable length.")
@@ -143,8 +138,7 @@ int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::BaseStre
       regex_flags.push_back(utils::Regex::Mode::ICASE);
     }
 
-    bool ignoregroupzero;
-    ctx_->getProperty(IgnoreCaptureGroupZero.getName(), ignoregroupzero);
+    const bool include_capture_group_zero = ctx_->getProperty<bool>(IncludeCaptureGroupZero).value_or(true);
 
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
@@ -171,9 +165,7 @@ int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::BaseStre
         utils::Regex rgx(value, regex_flags);
         utils::SMatch matches;
         while (utils::regexSearch(workStr, matches, rgx)) {
-          size_t i = ignoregroupzero ? 1 : 0;
-
-          for (; i < matches.size(); ++i, ++matchcount) {
+          for (std::size_t i = (include_capture_group_zero ? 0 : 1); i < matches.size(); ++i, ++matchcount) {
             std::string attributeValue = matches[i];
             if (attributeValue.length() > maxCaptureSize) {
               attributeValue = attributeValue.substr(0, maxCaptureSize);
@@ -212,8 +204,4 @@ ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile
 
 REGISTER_RESOURCE(ExtractText, Processor);
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index 49abbd56c..0d06a3c53 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXTRACTTEXT_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXTRACTTEXT_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -29,11 +28,7 @@
 #include "FlowFileRecord.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class ExtractText : public core::Processor {
  public:
@@ -46,7 +41,7 @@ class ExtractText : public core::Processor {
   EXTENSIONAPI static core::Property Attribute;
   EXTENSIONAPI static core::Property SizeLimit;
   EXTENSIONAPI static core::Property RegexMode;
-  EXTENSIONAPI static core::Property IgnoreCaptureGroupZero;
+  EXTENSIONAPI static core::Property IncludeCaptureGroupZero;
   EXTENSIONAPI static core::Property InsensitiveMatch;
   EXTENSIONAPI static core::Property MaxCaptureGroupLen;
   EXTENSIONAPI static core::Property EnableRepeatingCaptureGroup;
@@ -55,7 +50,7 @@ class ExtractText : public core::Processor {
       Attribute,
       SizeLimit,
       RegexMode,
-      IgnoreCaptureGroupZero,
+      IncludeCaptureGroupZero,
       InsensitiveMatch,
       MaxCaptureGroupLen,
       EnableRepeatingCaptureGroup
@@ -92,10 +87,4 @@ class ExtractText : public core::Processor {
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExtractText>::getLogger();
 };
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXTRACTTEXT_H_
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
index f41f74e7a..e4796a661 100644
--- a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
@@ -17,7 +17,6 @@
  */
 #include <list>
 #include <fstream>
-#include <map>
 #include <memory>
 #include <utility>
 #include <string>
@@ -144,7 +143,6 @@ TEST_CASE("Test usage of ExtractText in regex mode", "[extracttextRegexTest]") {
 
   std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true);
   plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::RegexMode.getName(), "true");
-  plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::IgnoreCaptureGroupZero.getName(), "true");
   plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::EnableRepeatingCaptureGroup.getName(), "true");
   plan->setProperty(maprocessor, "RegexAttr", "Speed limit ([0-9]+)", true);
   plan->setProperty(maprocessor, "InvalidRegex", "[Invalid)A(F)", true);
@@ -162,17 +160,34 @@ TEST_CASE("Test usage of ExtractText in regex mode", "[extracttextRegexTest]") {
     test_file.close();
   }
 
-  plan->runNextProcessor();  // GetFile
-  plan->runNextProcessor();  // ExtractText
-  plan->runNextProcessor();  // LogAttribute
+  std::list<std::string> expected_logs;
+
+  SECTION("Do not include capture group 0") {
+    plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::IncludeCaptureGroupZero.getName(), "false");
+
+    testController.runSession(plan);
 
-  std::list<std::string> suffixes = { "", ".0", ".1" };
+    expected_logs = {
+      "key:RegexAttr value:130",
+      "key:RegexAttr.0 value:130",
+      "key:RegexAttr.1 value:80"
+    };
+  }
+
+  SECTION("Include capture group 0") {
+    testController.runSession(plan);
+
+    expected_logs = {
+      "key:RegexAttr value:Speed limit 130",
+      "key:RegexAttr.0 value:Speed limit 130",
+      "key:RegexAttr.1 value:130",
+      "key:RegexAttr.2 value:Speed limit 80",
+      "key:RegexAttr.3 value:80"
+    };
+  }
 
-  for (const auto& suffix : suffixes) {
-    ss.str("");
-    ss << "key:" << "RegexAttr" << suffix << " value:" << ((suffix == ".1") ? "80" : "130");
-    std::string log_check = ss.str();
-    REQUIRE(LogTestController::getInstance().contains(log_check));
+  for (const auto& log : expected_logs) {
+    REQUIRE(LogTestController::getInstance().contains(log));
   }
 
   std::string error_str = "error encountered when trying to construct regular expression from property (key: InvalidRegex)";
@@ -199,6 +214,7 @@ TEST_CASE("Test usage of ExtractText in regex mode with large regex matches", "[
 
   auto extract_text_processor = plan->addProcessor("ExtractText", "ExtractText", core::Relationship("success", "description"), true);
   plan->setProperty(extract_text_processor, org::apache::nifi::minifi::processors::ExtractText::RegexMode.getName(), "true");
+  plan->setProperty(extract_text_processor, org::apache::nifi::minifi::processors::ExtractText::IncludeCaptureGroupZero.getName(), "false");
   plan->setProperty(extract_text_processor, "RegexAttr", "Speed limit (.*)", true);
 
   auto log_attribute_processor = plan->addProcessor("LogAttribute", "outputLogAttribute", core::Relationship("success", "description"), true);


[nifi-minifi-cpp] 01/03: MINIFICPP-1857 Create ListenTCP class

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3425bcbebd04970759e8608429c72f626071d3e3
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Jun 27 16:16:34 2022 +0200

    MINIFICPP-1857 Create ListenTCP class
    
    Closes #1350
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 PROCESSORS.md                                      |  21 ++-
 README.md                                          |   2 +-
 .../integration/features/network_listener.feature  |  14 ++
 docker/test/integration/minifi/core/ImageStore.py  |  11 ++
 .../minifi/core/SingleNodeDockerCluster.py         |   3 +
 .../integration/minifi/core/TcpClientContainer.py  |  23 +++
 .../integration/minifi/processors/ListenTCP.py     |  12 ++
 docker/test/integration/steps/steps.py             |   6 +
 .../processors/ListenSyslog.cpp                    | 175 ++-------------------
 .../standard-processors/processors/ListenSyslog.h  | 126 +--------------
 .../standard-processors/processors/ListenTCP.cpp   |  68 ++++++++
 .../standard-processors/processors/ListenTCP.h     |  57 +++++++
 .../processors/NetworkListenerProcessor.cpp        |  79 ++++++++++
 .../processors/NetworkListenerProcessor.h          |  65 ++++++++
 .../standard-processors/processors/PutUDP.cpp      |   2 +-
 .../tests/unit/ListenSyslogTests.cpp               |  61 +------
 .../tests/unit/ListenTcpTests.cpp                  |  93 +++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |   2 +-
 libminifi/CMakeLists.txt                           |   3 +-
 libminifi/include/core/PropertyValidation.h        |   8 +-
 libminifi/include/utils/net/DNS.h                  |  14 +-
 libminifi/include/utils/net/IpProtocol.h           |  28 ++++
 libminifi/include/utils/net/Server.h               |  81 ++++++++++
 libminifi/include/utils/net/TcpServer.h            |  62 ++++++++
 libminifi/include/utils/net/UdpServer.h            |  49 ++++++
 libminifi/src/io/ClientSocket.cpp                  |  14 +-
 libminifi/src/utils/net/DNS.cpp                    |   8 +-
 libminifi/src/utils/net/TcpServer.cpp              |  85 ++++++++++
 libminifi/src/utils/net/UdpServer.cpp              |  46 ++++++
 libminifi/test/SingleProcessorTestController.h     |  26 +++
 libminifi/test/Utils.h                             |  31 ++++
 libminifi/test/unit/PropertyValidationTests.cpp    |  10 ++
 32 files changed, 918 insertions(+), 367 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index f1b4bd7ba..e71eb88a4 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -45,6 +45,7 @@
 - [ListGCSBucket](#listgcsbucket)
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
+- [ListenTCP](#listentcp)
 - [ListFile](#listfile)
 - [ListS3](#lists3)
 - [ListSFTP](#listsftp)
@@ -1247,7 +1248,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 | Protocol                  | UDP           | UDP<br>TCP<br>   | The protocol for Syslog communication.                                                                                                                                                               |
 | Parse Messages            | false         | false<br>true    | Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.                 |
 | Max Batch Size            | 500           |                  | The maximum number of Syslog events to process at a time.                                                                                                                                            |
-| Max Size of Message Queue | 0             |                  | Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. If the buffer full, the message is ignored. If set to zero the buffer is unlimited. |
+| Max Size of Message Queue | 10000         |                  | Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. |
 
 ### Relationships
 
@@ -1279,6 +1280,24 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 
 
+## ListenTCP
+
+### Description
+
+Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. For each message the processor produces a single FlowFile.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | Description                                                                                                                                                                                   |
+|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                  | The port to listen on for communication.                                                                                                                                                      |
+| **Max Batch Size**            | 500           |                  | The maximum number of messages to process at a time.                                                                                                                                          |
+| **Max Size of Message Queue** | 10000         |                  | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. |
+
+
 ## ListFile
 
 ### Description
diff --git a/README.md b/README.md
index 36b171586..d0a98b99f 100644
--- a/README.md
+++ b/README.md
@@ -65,7 +65,7 @@ The following table lists the base set of processors.
 
 | Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
 |---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
-| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListFile](PROCE [...]
+| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/docker/test/integration/features/network_listener.feature b/docker/test/integration/features/network_listener.feature
new file mode 100644
index 000000000..63de76f98
--- /dev/null
+++ b/docker/test/integration/features/network_listener.feature
@@ -0,0 +1,14 @@
+Feature: Minifi C++ can act as a network listener
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A TCP client can send messages to Minifi
+    Given a ListenTCP processor
+    And the "Listening Port" property of the ListenTCP processor is set to "10254"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a TCP client is set up to send a test TCP message to minifi
+    And the "success" relationship of the ListenTCP processor is connected to the PutFile
+
+    When both instances start up
+    Then at least one flowfile with the content "test_tcp_message" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index d9f850e61..1d2def6a4 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -56,6 +56,8 @@ class ImageStore:
             image = self.__build_mqtt_broker_image()
         elif container_engine == "splunk":
             image = self.__build_splunk_image()
+        elif container_engine == "tcp-client":
+            image = self.__build_tcp_client_image()
         else:
             raise Exception("There is no associated image for " + container_engine)
 
@@ -165,6 +167,15 @@ class ImageStore:
     def __build_splunk_image(self):
         return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk')
 
+    def __build_tcp_client_image(self):
+        dockerfile = dedent("""\
+            FROM {base_image}
+            RUN apk add netcat-openbsd
+            CMD ["/bin/sh", "-c", "echo TCP client container started; while true; do echo test_tcp_message | nc minifi-cpp-flow 10254; sleep 1; done"]
+            """.format(base_image='alpine:3.13'))
+
+        return self.__build_image(dockerfile)
+
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index b622685b0..d98d4dfc5 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -36,6 +36,7 @@ from .SplunkContainer import SplunkContainer
 from .SyslogUdpClientContainer import SyslogUdpClientContainer
 from .SyslogTcpClientContainer import SyslogTcpClientContainer
 from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
+from .TcpClientContainer import TcpClientContainer
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -121,6 +122,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, SyslogUdpClientContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == "syslog-tcp-client":
             return self.containers.setdefault(name, SyslogTcpClientContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "tcp-client":
+            return self.containers.setdefault(name, TcpClientContainer(name, self.vols, self.network, self.image_store, command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/core/TcpClientContainer.py b/docker/test/integration/minifi/core/TcpClientContainer.py
new file mode 100644
index 000000000..0287263af
--- /dev/null
+++ b/docker/test/integration/minifi/core/TcpClientContainer.py
@@ -0,0 +1,23 @@
+import logging
+from .Container import Container
+
+
+class TcpClientContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'tcp-client', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "TCP client container started"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running a tcp client docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            entrypoint=self.command)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/processors/ListenTCP.py b/docker/test/integration/minifi/processors/ListenTCP.py
new file mode 100644
index 000000000..4781ca9ee
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListenTCP.py
@@ -0,0 +1,12 @@
+from ..core.Processor import Processor
+
+
+class ListenTCP(Processor):
+    def __init__(self, schedule=None):
+        properties = {}
+
+        super(ListenTCP, self).__init__(
+            'ListenTCP',
+            properties=properties,
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 8769ed6fb..db4d826a5 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -416,6 +416,12 @@ def step_impl(context):
     context.test.start_splunk()
 
 
+# TCP client
+@given('a TCP client is set up to send a test TCP message to minifi')
+def step_impl(context):
+    context.test.acquire_container("tcp-client", "tcp-client")
+
+
 @given("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus")
 def step_impl(context):
     root_ca_cert, root_ca_key = make_ca("root CA")
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 5a938ef45..d9db46e90 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -34,8 +34,8 @@ const core::Property ListenSyslog::ProtocolProperty(
     core::PropertyBuilder::createProperty("Protocol")
         ->withDescription("The protocol for Syslog communication.")
         ->isRequired(true)
-        ->withAllowableValues(Protocol::values())
-        ->withDefaultValue(toString(Protocol::UDP))
+        ->withAllowableValues(utils::net::IpProtocol::values())
+        ->withDefaultValue(toString(utils::net::IpProtocol::UDP))
         ->build());
 
 const core::Property ListenSyslog::MaxBatchSize(
@@ -53,15 +53,15 @@ const core::Property ListenSyslog::ParseMessages(
 const core::Property ListenSyslog::MaxQueueSize(
     core::PropertyBuilder::createProperty("Max Size of Message Queue")
         ->withDescription("Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. "
-                          "If the buffer full, the message is ignored. If set to zero the buffer is unlimited.")
-        ->withDefaultValue<uint64_t>(0)->build());
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)->build());
 
 const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. "
                                                           "When Parse Messages is set to false, all incoming message will be sent to this relationship.");
 const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship.");
 
 
-const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+const std::regex ListenSyslog::rfc5424_pattern_(
     R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                                                    // priority
     R"((?:(\d{1,2}))\s)"                                                                                      // version
     R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"        // timestamp
@@ -72,7 +72,7 @@ const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
     R"((?:(-|(?:\[.+?\])+))\s?)"                                                                              // structured_data
     R"((?:((?:.+)))?$)", std::regex::ECMAScript);                                                             // msg
 
-const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+const std::regex ListenSyslog::rfc3164_pattern_(
     R"((?:\<(\d{1,3})\>))"                                                                                    // priority
     R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"                                             // timestamp
     R"(([\w][\w\d(\.|\:)@-]*)\s)"                                                                             // hostname
@@ -84,74 +84,22 @@ void ListenSyslog::initialize() {
 }
 
 void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
-  gsl_Expects(context && !server_thread_.joinable() && !server_);
-
-  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
-  if (max_batch_size_ < 1)
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+  gsl_Expects(context);
 
   context->getProperty(ParseMessages.getName(), parse_messages_);
 
-  uint64_t max_queue_size = 0;
-  context->getProperty(MaxQueueSize.getName(), max_queue_size);
-  max_queue_size_ = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt;
-
-  Protocol protocol;
+  utils::net::IpProtocol protocol;
   context->getProperty(ProtocolProperty.getName(), protocol);
 
-  int port;
-  context->getProperty(Port.getName(), port);
-
-  if (protocol == Protocol::UDP) {
-    server_ = std::make_unique<UdpServer>(io_context_, queue_, max_queue_size_, port);
-  } else if (protocol == Protocol::TCP) {
-    server_ = std::make_unique<TcpServer>(io_context_, queue_, max_queue_size_, port);
-  } else {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid protocol");
-  }
-
-  server_thread_ = std::thread([this]() { io_context_.run(); });
-  logger_->log_debug("Started %s syslog server on port %d with %s max queue size and %zu max batch size",
-                     protocol.toString(),
-                     port,
-                     max_queue_size ? std::to_string(*max_queue_size_) : "no",
-                     max_batch_size_);
-}
-
-void ListenSyslog::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
-  gsl_Expects(session && max_batch_size_ > 0);
-  size_t logs_processed = 0;
-  while (!queue_.empty() && logs_processed < max_batch_size_) {
-    SyslogMessage received_message;
-    if (!queue_.tryDequeue(received_message))
-      break;
-    received_message.transferAsFlowFile(*session, parse_messages_);
-    ++logs_processed;
-  }
-}
-
-void ListenSyslog::stopServer() {
-  io_context_.stop();
-  if (server_thread_.joinable())
-    server_thread_.join();
-  server_.reset();
-  io_context_.reset();
-  logger_->log_debug("Stopped syslog server");
-}
-
-ListenSyslog::SyslogMessage::SyslogMessage(std::string message, Protocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port)
-    : message_(std::move(message)),
-      protocol_(protocol),
-      server_port_(server_port),
-      sender_address_(std::move(sender_address)) {
+  startServer(*context, MaxBatchSize, MaxQueueSize, Port, protocol);
 }
 
-void ListenSyslog::SyslogMessage::transferAsFlowFile(core::ProcessSession& session, bool should_parse) {
+void ListenSyslog::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
   std::shared_ptr<core::FlowFile> flow_file = session.create();
   bool valid = true;
-  if (should_parse) {
+  if (parse_messages_) {
     std::smatch syslog_match;
-    if (std::regex_search(message_, syslog_match, rfc5424_pattern_)) {
+    if (std::regex_search(message.message_data, syslog_match, rfc5424_pattern_)) {
       uint64_t priority = std::stoull(syslog_match[1]);
       flow_file->setAttribute("syslog.priority", std::to_string(priority));
       flow_file->setAttribute("syslog.severity", std::to_string(priority % 8));
@@ -165,7 +113,7 @@ void ListenSyslog::SyslogMessage::transferAsFlowFile(core::ProcessSession& sessi
       flow_file->setAttribute("syslog.structured_data", syslog_match[8]);
       flow_file->setAttribute("syslog.msg", syslog_match[9]);
       flow_file->setAttribute("syslog.valid", "true");
-    } else if (std::regex_search(message_, syslog_match, rfc3164_pattern_)) {
+    } else if (std::regex_search(message.message_data, syslog_match, rfc3164_pattern_)) {
       uint64_t priority = std::stoull(syslog_match[1]);
       flow_file->setAttribute("syslog.priority", std::to_string(priority));
       flow_file->setAttribute("syslog.severity", std::to_string(priority % 8));
@@ -180,102 +128,13 @@ void ListenSyslog::SyslogMessage::transferAsFlowFile(core::ProcessSession& sessi
     }
   }
 
-  session.writeBuffer(flow_file, message_);
-  flow_file->setAttribute("syslog.protocol", protocol_.toString());
-  flow_file->setAttribute("syslog.port", std::to_string(server_port_));
-  flow_file->setAttribute("syslog.sender", sender_address_.to_string());
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("syslog.protocol", message.protocol.toString());
+  flow_file->setAttribute("syslog.port", std::to_string(message.server_port));
+  flow_file->setAttribute("syslog.sender", message.sender_address.to_string());
   session.transfer(flow_file, valid ? Success : Invalid);
 }
 
-ListenSyslog::TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size)
-    : concurrent_queue_(concurrent_queue),
-      max_queue_size_(max_queue_size),
-      socket_(io_context) {
-}
-
-asio::ip::tcp::socket& ListenSyslog::TcpSession::getSocket() {
-  return socket_;
-}
-
-void ListenSyslog::TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
-}
-
-void ListenSyslog::TcpSession::handleReadUntilNewLine(std::error_code error_code) {
-  if (error_code)
-    return;
-  std::istream is(&buffer_);
-  std::string message;
-  std::getline(is, message);
-  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-    concurrent_queue_.enqueue(SyslogMessage(message, Protocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
-  else
-    logger_->log_warn("Queue is full. Syslog message ignored.");
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
-}
-
-ListenSyslog::TcpServer::TcpServer(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size, uint16_t port)
-    : Server(io_context, concurrent_queue, max_queue_size),
-      acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) {
-  startAccept();
-}
-
-void ListenSyslog::TcpServer::startAccept() {
-  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_);
-  acceptor_.async_accept(new_session->getSocket(),
-                         [this, new_session](const auto& error_code) -> void {
-                           handleAccept(new_session, error_code);
-                         });
-}
-
-void ListenSyslog::TcpServer::handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error) {
-  if (error)
-    return;
-
-  session->start();
-  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_);
-  acceptor_.async_accept(new_session->getSocket(),
-                         [this, new_session](const auto& error_code) -> void {
-                           handleAccept(new_session, error_code);
-                         });
-}
-
-ListenSyslog::UdpServer::UdpServer(asio::io_context& io_context,
-                                   utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
-                                   std::optional<size_t> max_queue_size,
-                                   uint16_t port)
-    : Server(io_context, concurrent_queue, max_queue_size),
-      socket_(io_context, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) {
-  doReceive();
-}
-
-
-void ListenSyslog::UdpServer::doReceive() {
-  buffer_.resize(MAX_UDP_PACKET_SIZE);
-  socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
-                             sender_endpoint_,
-                             [this](std::error_code ec, std::size_t bytes_received) {
-                               if (!ec && bytes_received > 0) {
-                                 buffer_.resize(bytes_received);
-                                 if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-                                   concurrent_queue_.enqueue(SyslogMessage(std::move(buffer_), Protocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
-                                 else
-                                   logger_->log_warn("Queue is full. Syslog message ignored.");
-                               }
-                               doReceive();
-                             });
-}
-
 REGISTER_RESOURCE(ListenSyslog, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index 33f83eb55..44445da9b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -23,28 +23,15 @@
 #include <memory>
 #include <regex>
 
-#include "core/Processor.h"
-#include "core/logging/Logger.h"
+#include "NetworkListenerProcessor.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "utils/Enum.h"
-
-#include "asio/ts/buffer.hpp"
-#include "asio/ts/internet.hpp"
-#include "asio/streambuf.hpp"
 
 namespace org::apache::nifi::minifi::processors {
 
-class ListenSyslog : public core::Processor {
+class ListenSyslog : public NetworkListenerProcessor {
  public:
   explicit ListenSyslog(const std::string& name, const utils::Identifier& uuid = {})
-      : core::Processor(name, uuid) {
-  }
-  ListenSyslog(const ListenSyslog&) = delete;
-  ListenSyslog(ListenSyslog&&) = delete;
-  ListenSyslog& operator=(const ListenSyslog&) = delete;
-  ListenSyslog& operator=(ListenSyslog&&) = delete;
-  ~ListenSyslog() override {
-    stopServer();
+      : NetworkListenerProcessor(name, uuid, core::logging::LoggerFactory<ListenSyslog>::getLogger()) {
   }
 
   EXTENSIONAPI static constexpr const char* Description = "Listens for Syslog messages being sent to a given port over TCP or UDP. "
@@ -72,116 +59,15 @@ class ListenSyslog : public core::Processor {
   EXTENSIONAPI static const core::Relationship Invalid;
   static auto relationships() { return std::array{Success, Invalid}; }
 
-  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
-  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
-
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-
-  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
 
-  void notifyStop() override {
-    stopServer();
-  }
-
  private:
-  SMART_ENUM(Protocol,
-             (TCP, "TCP"),
-             (UDP, "UDP")
-  )
-
-  void stopServer();
-
-  class SyslogMessage {
-   public:
-    SyslogMessage() = default;
-    SyslogMessage(std::string message, Protocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port);
-
-    void transferAsFlowFile(core::ProcessSession& session, bool should_parse);
-
-   private:
-    std::string message_;
-    Protocol protocol_;
-    asio::ip::port_type server_port_{514};
-    asio::ip::address sender_address_;
-
-    static const std::regex rfc5424_pattern_;
-    static const std::regex rfc3164_pattern_;
-  };
-
-  class Server {
-   public:
-    virtual ~Server() = default;
-
-   protected:
-    Server(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size)
-        : concurrent_queue_(concurrent_queue), io_context_(io_context), max_queue_size_(max_queue_size) {}
+  void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
 
-    utils::ConcurrentQueue<SyslogMessage>& concurrent_queue_;
-    asio::io_context& io_context_;
-    std::optional<size_t> max_queue_size_;
-  };
+  static const std::regex rfc5424_pattern_;
+  static const std::regex rfc3164_pattern_;
 
-  class TcpSession : public std::enable_shared_from_this<TcpSession> {
-   public:
-    TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<SyslogMessage>& concurrent_queue, std::optional<size_t> max_queue_size);
-
-    asio::ip::tcp::socket& getSocket();
-    void start();
-    void handleReadUntilNewLine(std::error_code error_code);
-
-   private:
-    utils::ConcurrentQueue<SyslogMessage>& concurrent_queue_;
-    std::optional<size_t> max_queue_size_;
-    asio::ip::tcp::socket socket_;
-    asio::basic_streambuf<std::allocator<char>> buffer_;
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
-  };
-
-  class TcpServer : public Server {
-   public:
-    TcpServer(asio::io_context& io_context,
-              utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
-              std::optional<size_t> max_queue_size,
-              uint16_t port);
-
-   private:
-    void startAccept();
-    void handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error);
-
-    asio::ip::tcp::acceptor acceptor_;
-  };
-
-  class UdpServer : public Server {
-   public:
-    UdpServer(asio::io_context& io_context,
-              utils::ConcurrentQueue<SyslogMessage>& concurrent_queue,
-              std::optional<size_t> max_queue_size,
-              uint16_t port);
-
-   private:
-    void doReceive();
-
-    asio::ip::udp::socket socket_;
-    asio::ip::udp::endpoint sender_endpoint_;
-    std::string buffer_;
-
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
-    static inline constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
-  };
-
-  utils::ConcurrentQueue<SyslogMessage> queue_;
-  std::unique_ptr<Server> server_;
-  asio::io_context io_context_;
-  std::thread server_thread_;
-
-  uint64_t max_batch_size_ = 500;
-  std::optional<uint64_t> max_queue_size_;
   bool parse_messages_ = false;
-
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListenSyslog>::getLogger();
 };
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
new file mode 100644
index 000000000..c7d1be6d0
--- /dev/null
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ListenTCP.h"
+
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ListenTCP::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port to listen on for communication.")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
+        ->build());
+
+const core::Property ListenTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
+
+const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
+
+void ListenTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  startServer(*context, MaxBatchSize, MaxQueueSize, Port, utils::net::IpProtocol::TCP);
+}
+
+void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+  auto flow_file = session.create();
+  session.writeBuffer(flow_file, message.message_data);
+  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  session.transfer(flow_file, Success);
+}
+
+REGISTER_RESOURCE(ListenTCP, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenTCP.h
new file mode 100644
index 000000000..17f0aafc5
--- /dev/null
+++ b/extensions/standard-processors/processors/ListenTCP.h
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "NetworkListenerProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class ListenTCP : public NetworkListenerProcessor {
+ public:
+  explicit ListenTCP(const std::string& name, const utils::Identifier& uuid = {})
+    : NetworkListenerProcessor(name, uuid, core::logging::LoggerFactory<ListenTCP>::getLogger()) {
+  }
+
+  EXTENSIONAPI static constexpr const char* Description = "Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. "
+                                                          "For each message the processor produces a single FlowFile.";
+
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property MaxBatchSize;
+  EXTENSIONAPI static const core::Property MaxQueueSize;
+  static auto properties() {
+    return std::array{
+      Port,
+      MaxBatchSize,
+      MaxQueueSize
+    };
+  }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  static auto relationships() { return std::array{Success}; }
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+
+ private:
+  void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
new file mode 100644
index 000000000..81442ec11
--- /dev/null
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NetworkListenerProcessor.h"
+#include "utils/net/UdpServer.h"
+#include "utils/net/TcpServer.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+NetworkListenerProcessor::~NetworkListenerProcessor() {
+  stopServer();
+}
+
+void NetworkListenerProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!server_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!server_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
+  }
+}
+
+void NetworkListenerProcessor::startServer(
+    const core::ProcessContext& context, const core::Property& max_batch_size_prop, const core::Property& max_queue_size_prop, const core::Property& port_prop, utils::net::IpProtocol protocol) {
+  gsl_Expects(!server_thread_.joinable() && !server_);
+  context.getProperty(max_batch_size_prop.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESSOR_EXCEPTION, "Max Batch Size property is invalid");
+
+  uint64_t max_queue_size = 0;
+  context.getProperty(max_queue_size_prop.getName(), max_queue_size);
+  auto max_queue_size_opt = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt;
+
+  int port;
+  context.getProperty(port_prop.getName(), port);
+
+  if (protocol == utils::net::IpProtocol::UDP) {
+    server_ = std::make_unique<utils::net::UdpServer>(max_queue_size_opt, port, logger_);
+  } else if (protocol == utils::net::IpProtocol::TCP) {
+    server_ = std::make_unique<utils::net::TcpServer>(max_queue_size_opt, port, logger_);
+  } else {
+    throw Exception(PROCESSOR_EXCEPTION, "Invalid protocol");
+  }
+
+  server_thread_ = std::thread([this]() { server_->run(); });
+  logger_->log_debug("Started %s server on port %d with %s max queue size and %zu max batch size",
+                     protocol.toString(),
+                     port,
+                     max_queue_size_opt ? std::to_string(*max_queue_size_opt) : "unlimited",
+                     max_batch_size_);
+}
+
+void NetworkListenerProcessor::stopServer() {
+  if (server_) {
+    server_->stop();
+  }
+  if (server_thread_.joinable()) {
+    server_thread_.join();
+  }
+  server_.reset();
+}
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
new file mode 100644
index 000000000..58170e510
--- /dev/null
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Property.h"
+#include "utils/net/Server.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class NetworkListenerProcessor : public core::Processor {
+ public:
+  NetworkListenerProcessor(const std::string& name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+    : core::Processor(name, uuid),
+      logger_(std::move(logger)) {
+  }
+  ~NetworkListenerProcessor() override;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  void notifyStop() override {
+    stopServer();
+  }
+
+ protected:
+  void stopServer();
+  void startServer(
+    const core::ProcessContext& context, const core::Property& max_batch_size_prop, const core::Property& max_queue_size_prop, const core::Property& port_prop, utils::net::IpProtocol protocol);
+  virtual void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) = 0;
+
+  uint64_t max_batch_size_{500};
+  std::unique_ptr<utils::net::Server> server_;
+  std::thread server_thread_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp
index 3ec575447..95000011d 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -124,7 +124,7 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons
     return names;
   };
 
-  utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::Udp)
+  utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::UDP)
       | utils::map(utils::dereference)
       | utils::map(debug_log_resolved_names)
       | utils::flatMap([](const auto& names) { return utils::net::open_socket(names); })
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 27b38551f..8266cd218 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -19,7 +19,7 @@
 #include "Catch.h"
 #include "ListenSyslog.h"
 #include "SingleProcessorTestController.h"
-#include "asio.hpp"
+#include "Utils.h"
 
 using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog;
 
@@ -207,21 +207,6 @@ void sendUDPPacket(const std::string_view content, uint64_t port) {
   socket.close();
 }
 
-void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) {
-  asio::io_context io_context;
-  asio::ip::tcp::socket socket(io_context);
-  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
-  socket.connect(remote_endpoint);
-  std::error_code err;
-  for (auto& content : contents) {
-    std::string tcp_message(content);
-    tcp_message += '\n';
-    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
-  }
-  REQUIRE(!err);
-  socket.close();
-}
-
 void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, std::string_view protocol) {
   CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port"));
   CHECK(protocol == flow_file.getAttribute("syslog.protocol"));
@@ -272,43 +257,6 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-bool triggerUntil(test::SingleProcessorTestController& controller,
-                  const std::unordered_map<core::Relationship, size_t>& expected_quantities,
-                  std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>& result,
-                  const std::chrono::milliseconds max_duration,
-                  const std::chrono::milliseconds wait_time = 50ms) {
-  auto start_time = std::chrono::steady_clock::now();
-  while (std::chrono::steady_clock::now() < start_time + max_duration) {
-    for (auto& [relationship, flow_files] : controller.trigger()) {
-      result[relationship].insert(result[relationship].end(), flow_files.begin(), flow_files.end());
-    }
-    bool expected_quantities_met = true;
-    for (const auto& [relationship, expected_quantity] : expected_quantities) {
-      if (result[relationship].size() < expected_quantity) {
-        expected_quantities_met = false;
-        break;
-      }
-    }
-    if (expected_quantities_met)
-      return true;
-    std::this_thread::sleep_for(wait_time);
-  }
-  return false;
-}
-
-bool countLogOccurrencesUntil(const std::string& pattern,
-                              const int occurrences,
-                              const std::chrono::milliseconds max_duration,
-                              const std::chrono::milliseconds wait_time = 50ms) {
-  auto start_time = std::chrono::steady_clock::now();
-  while (std::chrono::steady_clock::now() < start_time + max_duration) {
-    if (LogTestController::getInstance().countOccurrences(pattern) == occurrences)
-      return true;
-    std::this_thread::sleep_for(wait_time);
-  }
-  return false;
-}
-
 TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
@@ -335,7 +283,7 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
     sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT);
   }
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
-  REQUIRE(triggerUntil(controller, {{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
+  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[1]) == invalid_syslog);
 
@@ -393,7 +341,7 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
   }
 
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
-  REQUIRE(triggerUntil(controller, {{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
+  REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 9}, {ListenSyslog::Invalid, 1}}, result, 300ms, 50ms));
   REQUIRE(result.at(ListenSyslog::Success).size() == 9);
   REQUIRE(result.at(ListenSyslog::Invalid).size() == 1);
 
@@ -467,6 +415,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
     for (auto i = 0; i < 100; ++i) {
       sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT);
     }
+    CHECK(countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
   }
 
   SECTION("TCP") {
@@ -475,8 +424,8 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
     for (auto i = 0; i < 100; ++i) {
       sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT);
     }
+    CHECK(countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
   }
-  CHECK(countLogOccurrencesUntil("Queue is full. Syslog message ignored.", 50, 300ms, 50ms));
   CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
   CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
   CHECK(controller.trigger().at(ListenSyslog::Success).size() == 10);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
new file mode 100644
index 000000000..aad3f5dca
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenTCP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+
+using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10254;
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+}
+
+TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+
+  test::SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2"));
+
+  controller.plan->scheduleProcessor(listen_tcp);
+  sendMessagesViaTCP({"test_message_1"}, PORT);
+  sendMessagesViaTCP({"another_message"}, PORT);
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms));
+  CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1");
+  CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[1]) == "another_message");
+
+  check_for_attributes(*result.at(ListenTCP::Success)[0]);
+  check_for_attributes(*result.at(ListenTCP::Success)[1]);
+}
+
+TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  test::SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "100"));
+
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
+  REQUIRE_NOTHROW(controller.plan->reset(true));
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
+}
+
+TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+
+  test::SingleProcessorTestController controller{listen_tcp};
+  REQUIRE(listen_tcp->setProperty(ListenTCP::Port, std::to_string(PORT)));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "10"));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MaxQueueSize, "50"));
+
+  LogTestController::getInstance().setWarn<ListenTCP>();
+
+  controller.plan->scheduleProcessor(listen_tcp);
+  for (auto i = 0; i < 100; ++i) {
+    sendMessagesViaTCP({"test_message"}, PORT);
+  }
+
+  CHECK(countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms));
+  CHECK(controller.trigger().at(ListenTCP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenTCP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenTCP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenTCP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenTCP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenTCP::Success).empty());
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 49f4e3f32..9e8e95247 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -34,7 +34,7 @@ namespace org::apache::nifi::minifi::processors {
 namespace {
 struct DatagramListener {
   DatagramListener(const char* const hostname, const char* const port)
-    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::Udp).value()},
+    :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::UDP).value()},
      open_socket_{utils::net::open_socket(*resolved_names_)
         | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
   {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 0bf8b8824..9d2482ddc 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,8 @@ else()
 endif()
 
 include(RangeV3)
-list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON spdlog Threads::Threads gsl-lite libsodium range-v3 expected-lite date::date date::tz)
+include(Asio)
+list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON spdlog Threads::Threads gsl-lite libsodium range-v3 expected-lite date::date date::tz asio)
 if(NOT WIN32)
 	list(APPEND LIBMINIFI_LIBRARIES OSSP::libuuid++)
 endif()
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index 66ae49314..ca50e255f 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -210,12 +210,12 @@ class LongValidator : public PropertyValidator {
   ~LongValidator() override = default;
 
   ValidationResult validate(const std::string &subject, const std::shared_ptr<minifi::state::response::Value> &input) const override {
-    auto in64 = std::dynamic_pointer_cast<minifi::state::response::Int64Value>(input);
-    if (in64) {
+    if (auto in64 = std::dynamic_pointer_cast<minifi::state::response::Int64Value>(input)) {
       return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(in64->getStringValue()).isValid(in64->getValue() >= min_ && in64->getValue() <= max_).build();
-    } else {
-      auto intb = std::dynamic_pointer_cast<minifi::state::response::IntValue>(input);
+    } else if (auto intb = std::dynamic_pointer_cast<minifi::state::response::IntValue>(input)) {
       return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(intb->getStringValue()).isValid(intb->getValue() >= min_ && intb->getValue() <= max_).build();
+    } else {
+      return validate(subject, input->getStringValue());
     }
   }
 
diff --git a/libminifi/include/utils/net/DNS.h b/libminifi/include/utils/net/DNS.h
index 786eeeb5f..2b9a31b57 100644
--- a/libminifi/include/utils/net/DNS.h
+++ b/libminifi/include/utils/net/DNS.h
@@ -21,6 +21,7 @@
 #include <system_error>
 #include "nonstd/expected.hpp"
 #include "utils/gsl.h"
+#include "IpProtocol.h"
 
 struct addrinfo;
 
@@ -29,20 +30,15 @@ struct addrinfo_deleter {
   void operator()(addrinfo*) const noexcept;
 };
 
-enum class IpProtocol {
-  Tcp,
-  Udp
-};
-
-nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>, std::error_code> resolveHost(const char* hostname, const char* port, IpProtocol = IpProtocol::Tcp,
+nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>, std::error_code> resolveHost(const char* hostname, const char* port, IpProtocol = IpProtocol::TCP,
     bool need_canonname = false);
-inline auto resolveHost(const char* const port, const IpProtocol proto = IpProtocol::Tcp, const bool need_canonname = false) {
+inline auto resolveHost(const char* const port, const IpProtocol proto = IpProtocol::TCP, const bool need_canonname = false) {
   return resolveHost(nullptr, port, proto, need_canonname);
 }
-inline auto resolveHost(const char* const hostname, const uint16_t port, const IpProtocol proto = IpProtocol::Tcp, const bool need_canonname = false) {
+inline auto resolveHost(const char* const hostname, const uint16_t port, const IpProtocol proto = IpProtocol::TCP, const bool need_canonname = false) {
   return resolveHost(hostname, std::to_string(port).c_str(), proto, need_canonname);
 }
-inline auto resolveHost(const uint16_t port, const IpProtocol proto = IpProtocol::Tcp, const bool need_canonname = false) {
+inline auto resolveHost(const uint16_t port, const IpProtocol proto = IpProtocol::TCP, const bool need_canonname = false) {
   return resolveHost(nullptr, port, proto, need_canonname);
 }
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/IpProtocol.h b/libminifi/include/utils/net/IpProtocol.h
new file mode 100644
index 000000000..ac91156c5
--- /dev/null
+++ b/libminifi/include/utils/net/IpProtocol.h
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+SMART_ENUM(IpProtocol,
+  (TCP, "TCP"),
+  (UDP, "UDP")
+)
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Server.h b/libminifi/include/utils/net/Server.h
new file mode 100644
index 000000000..1266a148a
--- /dev/null
+++ b/libminifi/include/utils/net/Server.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "utils/Enum.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "core/logging/Logger.h"
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
+#include "IpProtocol.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+struct Message {
+ public:
+  Message() = default;
+  Message(std::string message_data, IpProtocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port)
+    : message_data(std::move(message_data)),
+      protocol(protocol),
+      server_port(server_port),
+      sender_address(std::move(sender_address)) {
+  }
+
+  std::string message_data;
+  IpProtocol protocol;
+  asio::ip::port_type server_port;
+  asio::ip::address sender_address;
+};
+
+class Server {
+ public:
+  void run() {
+    io_context_.run();
+  }
+  void reset() {
+    io_context_.restart();
+  }
+  void stop() {
+    io_context_.stop();
+  }
+  bool queueEmpty() {
+    return concurrent_queue_.empty();
+  }
+  bool tryDequeue(utils::net::Message& received_message) {
+    return concurrent_queue_.tryDequeue(received_message);
+  }
+  virtual ~Server() {
+    stop();
+  }
+
+ protected:
+  Server(std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
+      : max_queue_size_(max_queue_size), logger_(logger) {}
+
+  utils::ConcurrentQueue<Message> concurrent_queue_;
+  asio::io_context io_context_;
+  std::optional<size_t> max_queue_size_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h
new file mode 100644
index 000000000..3ccebdb78
--- /dev/null
+++ b/libminifi/include/utils/net/TcpServer.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <system_error>
+
+#include "Server.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+class TcpSession : public std::enable_shared_from_this<TcpSession> {
+ public:
+  TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger);
+
+  asio::ip::tcp::socket& getSocket();
+  void start();
+  void handleReadUntilNewLine(std::error_code error_code);
+
+ private:
+  utils::ConcurrentQueue<Message>& concurrent_queue_;
+  std::optional<size_t> max_queue_size_;
+  asio::ip::tcp::socket socket_;
+  asio::basic_streambuf<std::allocator<char>> buffer_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+class TcpServer : public Server {
+ public:
+  TcpServer(std::optional<size_t> max_queue_size,
+            uint16_t port,
+            std::shared_ptr<core::logging::Logger> logger);
+
+ private:
+  void startAccept();
+  void handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error);
+
+  asio::ip::tcp::acceptor acceptor_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/UdpServer.h b/libminifi/include/utils/net/UdpServer.h
new file mode 100644
index 000000000..e9b852b00
--- /dev/null
+++ b/libminifi/include/utils/net/UdpServer.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "Server.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+class UdpServer : public Server {
+ public:
+  UdpServer(std::optional<size_t> max_queue_size,
+            uint16_t port,
+            std::shared_ptr<core::logging::Logger> logger);
+
+ private:
+  void doReceive();
+
+  asio::ip::udp::socket socket_;
+  asio::ip::udp::endpoint sender_endpoint_;
+  std::string buffer_;
+
+  static constexpr size_t MAX_UDP_PACKET_SIZE = 65535;
+};
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index bc9e89c1f..fc321c233 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -106,11 +106,7 @@ std::error_code set_non_blocking(const mio::SocketDescriptor fd) noexcept {
 }
 }  // namespace
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
+namespace org::apache::nifi::minifi::io {
 
 
 bool valid_socket(const SocketDescriptor fd) noexcept {
@@ -342,7 +338,7 @@ int Socket::initialize() {
     return nullptr;
   }();
   const bool is_server = hostname == nullptr;
-  const auto addr_info_or_error = utils::net::resolveHost(hostname, port_, utils::net::IpProtocol::Tcp, !is_server);
+  const auto addr_info_or_error = utils::net::resolveHost(hostname, port_, utils::net::IpProtocol::TCP, !is_server);
   if (!addr_info_or_error) {
     logger_->log_error("getaddrinfo: %s", addr_info_or_error.error().message());
     return -1;
@@ -518,8 +514,4 @@ size_t Socket::read(gsl::span<std::byte> buf, bool retrieve_all_bytes) {
   return total_read;
 }
 
-} /* namespace io */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/src/utils/net/DNS.cpp b/libminifi/src/utils/net/DNS.cpp
index eac09e2d4..b5eac92bb 100644
--- a/libminifi/src/utils/net/DNS.cpp
+++ b/libminifi/src/utils/net/DNS.cpp
@@ -69,14 +69,14 @@ nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>, std
   addrinfo hints{};
   memset(&hints, 0, sizeof hints);  // make sure the struct is empty
   hints.ai_family = AF_UNSPEC;
-  hints.ai_socktype = protocol == IpProtocol::Tcp ? SOCK_STREAM : SOCK_DGRAM;
+  hints.ai_socktype = protocol == IpProtocol::TCP ? SOCK_STREAM : SOCK_DGRAM;
   hints.ai_flags = need_canonname ? AI_CANONNAME : 0;
   if (!hostname)
     hints.ai_flags |= AI_PASSIVE;
   hints.ai_protocol = [protocol]() -> int {
-    switch (protocol) {
-      case IpProtocol::Tcp: return IPPROTO_TCP;
-      case IpProtocol::Udp: return IPPROTO_UDP;
+    switch (protocol.value()) {
+      case IpProtocol::TCP: return IPPROTO_TCP;
+      case IpProtocol::UDP: return IPPROTO_UDP;
     }
     return 0;
   }();
diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp
new file mode 100644
index 000000000..6f0bb13de
--- /dev/null
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/net/TcpServer.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+TcpSession::TcpSession(asio::io_context& io_context, utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> max_queue_size, std::shared_ptr<core::logging::Logger> logger)
+  : concurrent_queue_(concurrent_queue),
+    max_queue_size_(max_queue_size),
+    socket_(io_context),
+    logger_(std::move(logger)) {
+}
+
+asio::ip::tcp::socket& TcpSession::getSocket() {
+  return socket_;
+}
+
+void TcpSession::start() {
+  asio::async_read_until(socket_,
+                         buffer_,
+                         '\n',
+                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
+                           self->handleReadUntilNewLine(error_code);
+                         });
+}
+
+void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
+  if (error_code)
+    return;
+  std::istream is(&buffer_);
+  std::string message;
+  std::getline(is, message);
+  if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+    concurrent_queue_.enqueue(Message(message, IpProtocol::TCP, socket_.remote_endpoint().address(), socket_.local_endpoint().port()));
+  else
+    logger_->log_warn("Queue is full. TCP message ignored.");
+  asio::async_read_until(socket_,
+                         buffer_,
+                         '\n',
+                         [self = shared_from_this()](const auto& error_code, size_t) -> void {
+                           self->handleReadUntilNewLine(error_code);
+                         });
+}
+
+TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
+    : Server(max_queue_size, std::move(logger)),
+      acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) {
+  startAccept();
+}
+
+void TcpServer::startAccept() {
+  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_);
+  acceptor_.async_accept(new_session->getSocket(),
+                         [this, new_session](const auto& error_code) -> void {
+                           handleAccept(new_session, error_code);
+                         });
+}
+
+void TcpServer::handleAccept(const std::shared_ptr<TcpSession>& session, const std::error_code& error) {
+  if (error)
+    return;
+
+  session->start();
+  auto new_session = std::make_shared<TcpSession>(io_context_, concurrent_queue_, max_queue_size_, logger_);
+  acceptor_.async_accept(new_session->getSocket(),
+                         [this, new_session](const auto& error_code) -> void {
+                           handleAccept(new_session, error_code);
+                         });
+}
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp
new file mode 100644
index 000000000..490a081e7
--- /dev/null
+++ b/libminifi/src/utils/net/UdpServer.cpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/net/UdpServer.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+UdpServer::UdpServer(std::optional<size_t> max_queue_size,
+                     uint16_t port,
+                     std::shared_ptr<core::logging::Logger> logger)
+    : Server(max_queue_size, std::move(logger)),
+      socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) {
+  doReceive();
+}
+
+
+void UdpServer::doReceive() {
+  buffer_.resize(MAX_UDP_PACKET_SIZE);
+  socket_.async_receive_from(asio::buffer(buffer_, MAX_UDP_PACKET_SIZE),
+                             sender_endpoint_,
+                             [this](std::error_code ec, std::size_t bytes_received) {
+                               if (!ec && bytes_received > 0) {
+                                 buffer_.resize(bytes_received);
+                                 if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+                                   concurrent_queue_.enqueue(utils::net::Message(std::move(buffer_), IpProtocol::UDP, sender_endpoint_.address(), socket_.local_endpoint().port()));
+                                 else
+                                   logger_->log_warn("Queue is full. UDP message ignored.");
+                               }
+                               doReceive();
+                             });
+}
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/SingleProcessorTestController.h b/libminifi/test/SingleProcessorTestController.h
index b91b4c098..0cc22be37 100644
--- a/libminifi/test/SingleProcessorTestController.h
+++ b/libminifi/test/SingleProcessorTestController.h
@@ -26,8 +26,14 @@
 #include "TestBase.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
+#include "range/v3/algorithm/all_of.hpp"
+
+using namespace std::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
+
+using ProcessorTriggerResult = std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>;
+
 class SingleProcessorTestController : public TestController {
  public:
   explicit SingleProcessorTestController(const std::shared_ptr<core::Processor>& processor)
@@ -57,6 +63,26 @@ class SingleProcessorTestController : public TestController {
     return trigger();
   }
 
+  bool triggerUntil(const std::unordered_map<core::Relationship, size_t>& expected_quantities,
+                    ProcessorTriggerResult& result,
+                    const std::chrono::milliseconds max_duration,
+                    const std::chrono::milliseconds wait_time = 50ms) {
+    auto start_time = std::chrono::steady_clock::now();
+    while (std::chrono::steady_clock::now() < start_time + max_duration) {
+      for (auto& [relationship, flow_files] : trigger()) {
+        result[relationship].insert(result[relationship].end(), flow_files.begin(), flow_files.end());
+      }
+      if (ranges::all_of(expected_quantities, [&result](const auto& kv) {
+        const auto& [relationship, expected_quantity] = kv;
+        return result[relationship].size() >= expected_quantity;
+      })) {
+        return true;
+      }
+      std::this_thread::sleep_for(wait_time);
+    }
+    return false;
+  }
+
   core::Relationship addDynamicRelationship(std::string name) {
     auto relationship = core::Relationship{std::move(name), ""};
     outgoing_connections_.insert_or_assign(relationship, plan->addConnection(processor_, relationship, nullptr));
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 0864b3ba0..d11757793 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -21,6 +21,9 @@
 #include <vector>
 
 #include "rapidjson/document.h"
+#include "asio.hpp"
+
+using namespace std::chrono_literals;
 
 #undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
 
@@ -87,3 +90,31 @@ class ExceptionSubStringMatcher : public Catch::MatcherBase<T> {
  private:
   std::vector<std::string> possible_exception_substrs_;
 };
+
+bool countLogOccurrencesUntil(const std::string& pattern,
+                              const int occurrences,
+                              const std::chrono::milliseconds max_duration,
+                              const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    if (LogTestController::getInstance().countOccurrences(pattern) == occurrences)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
diff --git a/libminifi/test/unit/PropertyValidationTests.cpp b/libminifi/test/unit/PropertyValidationTests.cpp
index b91ce2ee9..e8803b15f 100644
--- a/libminifi/test/unit/PropertyValidationTests.cpp
+++ b/libminifi/test/unit/PropertyValidationTests.cpp
@@ -260,4 +260,14 @@ TEST_CASE("TimePeriodValue Property without validator") {
   REQUIRE_THROWS_AS(component.getProperty(prop.getName(), time_period_value), ValueException);
 }
 
+TEST_CASE("Validating listener port property") {
+  auto prop = core::PropertyBuilder::createProperty("Port")
+        ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
+        ->build();
+  REQUIRE_NOTHROW(prop.setValue("1234"));
+  REQUIRE_THROWS_AS(prop.setValue("banana"), InvalidValueException);
+  REQUIRE_THROWS_AS(prop.setValue("65536"), InvalidValueException);
+  REQUIRE_THROWS_AS(prop.setValue("-1"), InvalidValueException);
+}
+
 }  // namespace org::apache::nifi::minifi::core


[nifi-minifi-cpp] 03/03: MINIFICPP-1873 - Fix state management path config typo in minifi.properties file

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 77ec00b675bafd3fa9fe14d3537b6e3a8bd5ffd7
Author: Arpad Boda <ab...@apache.org>
AuthorDate: Mon Jun 27 16:20:17 2022 +0200

    MINIFICPP-1873 - Fix state management path config typo in minifi.properties file
    
    Closes #1358
    
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 CONFIGURE.md           | 4 ++--
 conf/minifi.properties | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 1d7f25f9e..e8c77d46c 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -174,7 +174,7 @@ separate "subdatabase" is created under the name `"flowfile"`.
      in minifi.properties
      nifi.flowfile.repository.directory.default=minifidb://${MINIFI_HOME}/agent_state/flowfile
 	 nifi.database.content.repository.directory.default=minifidb://${MINIFI_HOME}/agent_state/content
-	 nifi.state.manangement.provider.local.path=minifidb://${MINIFI_HOME}/agent_state/processor_states
+	 nifi.state.management.provider.local.path=minifidb://${MINIFI_HOME}/agent_state/processor_states
 
 We should not simultaneously use the same directory with and without the `minifidb://` scheme.
 Moreover the `"default"` name is restricted and should not be used.
@@ -184,7 +184,7 @@ Moreover the `"default"` name is restricted and should not be used.
      nifi.flowfile.repository.directory.default=minifidb://${MINIFI_HOME}/agent_state/flowfile
 	 nifi.database.content.repository.directory.default=${MINIFI_HOME}/agent_state
 	 ^ error: using the same database directory without the "minifidb://" scheme
-	 nifi.state.manangement.provider.local.path=minifidb://${MINIFI_HOME}/agent_state/default
+	 nifi.state.management.provider.local.path=minifidb://${MINIFI_HOME}/agent_state/default
 	 ^ error: "default" is restricted
 
 ### Configuring Repository encryption
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 8ba1d3975..0d51905db 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -48,7 +48,7 @@ nifi.content.repository.class.name=DatabaseContentRepository
 ## that implements CoreComponentStateManagementProvider
 ## (e.g. an instance of RocksDbPersistableKeyValueStoreService or UnorderedMapPersistableKeyValueStoreService)
 #nifi.state.management.provider.local=
-#nifi.state.manangement.provider.local.path=
+#nifi.state.management.provider.local.path=
 ## To make the default state storage persist every state change, set this to true
 ## this comes at a performance penalty, but makes sure no state is lost even on unclean shutdowns
 #nifi.state.management.provider.local.always.persist=true