You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/11/09 13:28:24 UTC

[nifi-minifi-cpp] branch main updated (4b6daa2f9 -> 852641804)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 4b6daa2f9 MINIFICPP-1977 Upgrade pybind11 to support Python 3.11
     new a7da4380f MINIFICPP-1934 PutTCP processor
     new 9627f2ebf MINIFICPP-1927 Fix ExecuteProcess arg escaping
     new 852641804 MINIFICPP-1966 Add AgentStatus to Prometheus metrics

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 METRICS.md                                         |  25 +-
 PROCESSORS.md                                      |  45 +-
 README.md                                          |   7 +-
 cmake/Asio.cmake                                   |   4 +-
 .../test/integration/features/prometheus.feature   |   1 +
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../integration/minifi/core/PrometheusChecker.py   |  13 +
 .../processors/ExecuteProcess.cpp                  | 379 +++++++-------
 .../processors/ExecuteProcess.h                    |  82 ++-
 .../standard-processors/processors/PutTCP.cpp      | 581 +++++++++++++++++++++
 extensions/standard-processors/processors/PutTCP.h | 129 +++++
 .../standard-processors/tests/CMakeLists.txt       |  10 +-
 .../tests/integration/TestExecuteProcess.cpp       | 104 ----
 .../tests/resource_apps/EchoParameters.cpp         |  36 +-
 .../tests/unit/ExecuteProcessTests.cpp             | 175 +++++++
 .../tests/unit/ListenSyslogTests.cpp               |  11 +-
 .../tests/unit/ListenTcpTests.cpp                  |  18 +-
 .../standard-processors/tests/unit/PutTCPTests.cpp | 517 ++++++++++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |   2 +-
 .../tests/unit/resources/alice_by_A.pem            |  46 ++
 .../tests/unit/resources/alice_by_B.pem            |  46 ++
 .../tests/unit/resources/ca_A.crt                  |  21 +
 .../tests/unit/resources/ca_B.crt                  |  21 +
 .../tests/unit/resources/ca_cert.crt               |  20 -
 .../tests/unit/resources/cert_and_private_key.pem  |  46 --
 .../tests/unit/resources/localhost_by_A.pem        |  46 ++
 .../tests/unit/resources/localhost_by_B.pem        |  46 ++
 .../include/core/state/nodes/AgentInformation.h    |  40 +-
 .../include/core/state/nodes/ResponseNodeLoader.h  |   1 +
 libminifi/include/utils/StringUtils.h              |   9 +
 libminifi/include/utils/TimeUtil.h                 |   4 +-
 .../src/core/state/nodes/AgentInformation.cpp      |   1 +
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  10 +
 libminifi/src/utils/net/SslServer.cpp              |   1 -
 libminifi/src/utils/net/TcpServer.cpp              |   1 -
 35 files changed, 2043 insertions(+), 457 deletions(-)
 create mode 100644 extensions/standard-processors/processors/PutTCP.cpp
 create mode 100644 extensions/standard-processors/processors/PutTCP.h
 delete mode 100644 extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
 copy python/library/python_lib.cpp => extensions/standard-processors/tests/resource_apps/EchoParameters.cpp (65%)
 create mode 100644 extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp
 create mode 100644 extensions/standard-processors/tests/unit/PutTCPTests.cpp
 create mode 100644 extensions/standard-processors/tests/unit/resources/alice_by_A.pem
 create mode 100644 extensions/standard-processors/tests/unit/resources/alice_by_B.pem
 create mode 100644 extensions/standard-processors/tests/unit/resources/ca_A.crt
 create mode 100644 extensions/standard-processors/tests/unit/resources/ca_B.crt
 delete mode 100644 extensions/standard-processors/tests/unit/resources/ca_cert.crt
 delete mode 100644 extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem
 create mode 100644 extensions/standard-processors/tests/unit/resources/localhost_by_A.pem
 create mode 100644 extensions/standard-processors/tests/unit/resources/localhost_by_B.pem


[nifi-minifi-cpp] 03/03: MINIFICPP-1966 Add AgentStatus to Prometheus metrics

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 852641804c56e520e5772274437c19fce3953bda
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Nov 9 13:34:48 2022 +0100

    MINIFICPP-1966 Add AgentStatus to Prometheus metrics
    
    AgentsStatus containing agent specific metrics like agent's cpu and
    memory utilization was missing from the prometheus metrics. This adds
    it.
    
    Closes #1438
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 METRICS.md                                         | 25 +++++++++++++-
 .../test/integration/features/prometheus.feature   |  1 +
 docker/test/integration/minifi/core/ImageStore.py  |  2 +-
 .../integration/minifi/core/PrometheusChecker.py   | 13 +++++++
 .../include/core/state/nodes/AgentInformation.h    | 40 +++++++++++++++++++++-
 .../include/core/state/nodes/ResponseNodeLoader.h  |  1 +
 .../src/core/state/nodes/AgentInformation.cpp      |  1 +
 .../src/core/state/nodes/ResponseNodeLoader.cpp    | 10 ++++++
 8 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 543b67f4c..fa0732c06 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -124,7 +124,7 @@ DeviceInfoNode is a system level metric that reports metrics about the system re
 
 ### FlowInformation
 
-DeviceInfoNode is a system level metric that reports metrics about the system resources used and available
+FlowInformation is a system level metric that reports component and queue related metrics.
 
 | Metric name          | Labels                           | Description                                |
 |----------------------|----------------------------------|--------------------------------------------|
@@ -141,6 +141,29 @@ DeviceInfoNode is a system level metric that reports metrics about the system re
 | component_uuid  | UUID of the component                                        |
 | component_name  | Name of the component                                        |
 
+### AgentStatus
+
+AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information.
+
+| Metric name              | Labels                         | Description                                                                                                |
+|--------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
+| is_running               | repository_name                | Is the repository running (1 or 0)                                                                         |
+| is_full                  | repository_name                | Is the repository full (1 or 0)                                                                            |
+| repository_size          | repository_name                | Current size of the repository                                                                             |
+| uptime_milliseconds      | -                              | Agent uptime in milliseconds                                                                               |
+| is_running               | component_uuid, component_name | Check if the component is running (1 or 0)                                                                 |
+| agent_memory_usage_bytes | -                              | Memory used by the agent process in bytes                                                                  |
+| agent_cpu_utilization    | -                              | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. |
+
+| Label           | Description                                              |
+|-----------------|----------------------------------------------------------|
+| repository_name | Name of the reported repository                          |
+| connection_uuid | UUID of the connection defined in the flow configuration |
+| connection_name | Name of the connection defined in the flow configuration |
+| component_uuid  | UUID of the component                                    |
+| component_name  | Name of the component                                    |
+
+
 ## Processor Metrics
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
diff --git a/docker/test/integration/features/prometheus.feature b/docker/test/integration/features/prometheus.feature
index 8c37a0109..e1af51ede 100644
--- a/docker/test/integration/features/prometheus.feature
+++ b/docker/test/integration/features/prometheus.feature
@@ -16,6 +16,7 @@ Feature: MiNiFi can publish metrics to Prometheus server
     And "PutFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "PutFile" processor
     And "FlowInformation" is published to the Prometheus server in less than 60 seconds
     And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
+    And "AgentStatus" is published to the Prometheus server in less than 60 seconds
 
   Scenario: Multiple GetFile metrics are reported by Prometheus
     Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 7f45e8273..4076f3031 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -105,7 +105,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py b/docker/test/integration/minifi/core/PrometheusChecker.py
index 936ae4178..14dc8eda6 100644
--- a/docker/test/integration/minifi/core/PrometheusChecker.py
+++ b/docker/test/integration/minifi/core/PrometheusChecker.py
@@ -37,6 +37,8 @@ class PrometheusChecker:
             return self.verify_flow_information_metrics()
         elif metric_class == "DeviceInfoNode":
             return self.verify_device_info_node_metrics()
+        elif metric_class == "AgentStatus":
+            return self.verify_agent_status_metrics()
         else:
             raise Exception("Metric class '%s' verification is not implemented" % metric_class)
 
@@ -64,6 +66,17 @@ class PrometheusChecker:
     def verify_device_info_node_metrics(self):
         return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
 
+    def verify_agent_status_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels)
+                    and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels)
+                    and self.verify_metric_exists('minifi_repository_size', 'AgentStatus', labels)):
+                return False
+        return self.verify_metric_exists('minifi_uptime_milliseconds', 'AgentStatus') and \
+            self.verify_metric_exists('minifi_agent_memory_usage_bytes', 'AgentStatus') and \
+            self.verify_metric_exists('minifi_agent_cpu_utilization', 'AgentStatus')
+
     def verify_metric_exists(self, metric_name, metric_class, labels={}):
         labels['metric_class'] = metric_class
         labels['agent_identifier'] = "Agent1"
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 42515d120..827a3e7d6 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -418,14 +418,22 @@ class AgentStatus : public StateMonitorNode {
       : StateMonitorNode(name) {
   }
 
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines current agent status including repository, component and resource usage information.";
+
   std::string getName() const override {
-    return "status";
+    return "AgentStatus";
   }
 
   void setRepositories(const std::map<std::string, std::shared_ptr<core::Repository>> &repositories) {
     repositories_ = repositories;
   }
 
+  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+    if (nullptr != repo) {
+      repositories_.insert(std::make_pair(repo->getName(), repo));
+    }
+  }
+
   std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
     auto serializedRepositories = serializeRepositories();
@@ -444,6 +452,36 @@ class AgentStatus : public StateMonitorNode {
     return serialized;
   }
 
+  std::vector<PublishedMetric> calculateMetrics() override {
+    std::vector<PublishedMetric> metrics;
+    for (const auto& [_, repo] : repositories_) {
+      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"repository_size", static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+    }
+    if (nullptr != monitor_) {
+      auto uptime = monitor_->getUptime();
+      metrics.push_back({"uptime_milliseconds", static_cast<double>(uptime), {{"metric_class", getName()}}});
+    }
+
+    if (nullptr != monitor_) {
+      monitor_->executeOnAllComponents([this, &metrics](StateController& component){
+        metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
+          {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", getName()}}});
+      });
+    }
+
+    metrics.push_back({"agent_memory_usage_bytes", static_cast<double>(utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()), {{"metric_class", getName()}}});
+
+    double cpu_usage = -1.0;
+    {
+      std::lock_guard<std::mutex> guard(cpu_load_tracker_mutex_);
+      cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection();
+    }
+    metrics.push_back({"agent_cpu_utilization", cpu_usage, {{"metric_class", getName()}}});
+    return metrics;
+  }
+
  protected:
   SerializedResponseNode serializeRepositories() const {
     SerializedResponseNode repositories;
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 58d2c8d85..8ad250f9f 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -52,6 +52,7 @@ class ResponseNodeLoader {
   void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
   void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
   void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node);
   void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
   void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
 
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/AgentInformation.cpp
index 0609f25ec..7bbf44d3b 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/src/core/state/nodes/AgentInformation.cpp
@@ -24,5 +24,6 @@ utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
 std::mutex AgentStatus::cpu_load_tracker_mutex_;
 
 REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+REGISTER_RESOURCE(AgentStatus, DescriptionOnly);
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index ad1389223..149fe14e1 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -126,6 +126,15 @@ void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>
   }
 }
 
+void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) {
+  auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get());
+  if (agent_status != nullptr) {
+    agent_status->addRepository(provenance_repo_);
+    agent_status->addRepository(flow_file_repo_);
+    agent_status->setStateMonitor(update_sink_);
+  }
+}
+
 void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
   auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
@@ -169,6 +178,7 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes
     initializeAgentIdentifier(response_node);
     initializeAgentMonitor(response_node);
     initializeAgentNode(response_node);
+    initializeAgentStatus(response_node);
     initializeConfigurationChecksums(response_node);
     initializeFlowMonitor(response_node, root);
   }


[nifi-minifi-cpp] 01/03: MINIFICPP-1934 PutTCP processor

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a7da4380f71cfe31038966d29487a3d7f7e9833c
Author: Martin Zink <ma...@apache.org>
AuthorDate: Wed Nov 9 13:28:00 2022 +0100

    MINIFICPP-1934 PutTCP processor
    
    Closes #1419
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  29 +-
 README.md                                          |   7 +-
 cmake/Asio.cmake                                   |   4 +-
 .../standard-processors/processors/PutTCP.cpp      | 581 +++++++++++++++++++++
 extensions/standard-processors/processors/PutTCP.h | 129 +++++
 .../tests/unit/ListenSyslogTests.cpp               |  11 +-
 .../tests/unit/ListenTcpTests.cpp                  |  18 +-
 .../standard-processors/tests/unit/PutTCPTests.cpp | 517 ++++++++++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |   2 +-
 .../tests/unit/resources/alice_by_A.pem            |  46 ++
 .../tests/unit/resources/alice_by_B.pem            |  46 ++
 .../tests/unit/resources/ca_A.crt                  |  21 +
 .../tests/unit/resources/ca_B.crt                  |  21 +
 .../tests/unit/resources/ca_cert.crt               |  20 -
 .../tests/unit/resources/cert_and_private_key.pem  |  46 --
 .../tests/unit/resources/localhost_by_A.pem        |  46 ++
 .../tests/unit/resources/localhost_by_B.pem        |  46 ++
 libminifi/include/utils/StringUtils.h              |   9 +
 libminifi/include/utils/TimeUtil.h                 |   4 +-
 libminifi/src/utils/net/SslServer.cpp              |   1 -
 libminifi/src/utils/net/TcpServer.cpp              |   1 -
 21 files changed, 1513 insertions(+), 92 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8a0aac5de..d052e5c6c 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -67,6 +67,7 @@
 - [PutSFTP](#putsftp)
 - [PutSplunkHTTP](#putsplunkhttp)
 - [PutSQL](#putsql)
+- [PutTCP](#puttcp)
 - [PutUDP](#putudp)
 - [QueryDatabaseTable](#querydatabasetable)
 - [QuerySplunkIndexingStatus](#querysplunkindexingstatus)
@@ -2167,6 +2168,32 @@ In the list below, the names of required properties appear in bold. Any other pr
 | success | After a successful SQL update operation, the incoming FlowFile sent here |
 
 
+## PutTCP
+
+### Description
+The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specif [...]
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                           | Default Value | Allowable Values | Description                                                                                                                                                                                                                                                                                                                                                                                                        |
+|--------------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Hostname**                   | localhost     |                  | The ip address or hostname of the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                                          |
+| **Port**                       |               |                  | The port or service on the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                                                 |
+| **Idle Connection Expiration** | 15 seconds    |                  | The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                   |
+| **Timeout**                    | 15 seconds    |                  | The timeout for connecting to and communicating with the destination.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                                                                                   |
+| **Connection Per FlowFile**    | false         |                  | Specifies whether to send each FlowFile's content on an individual connection.                                                                                                                                                                                                                                                                                                                                     |
+| Outgoing Message Delimiter     |               |                  | Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.<br/>**Supports Expression Language: true** |
+| SSL Context Service            |               |                  | The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.                                                                                                                                                                                                                                                                          |
+| Max Size of Socket Send Buffer |               |                  | The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.                                                                                                                                                                                                                                                      |
+
+### Properties
+| Name    | Description                                                                |
+|---------|----------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this relationship. |
+| failure | FlowFiles that encountered IO errors are sent out this relationship.       |
+
+
 ## PutUDP
 
 ### Description
@@ -2187,7 +2214,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 | Name    | Description                                                                   |
 |---------|-------------------------------------------------------------------------------|
 | success | FlowFiles that are sent to the destination are sent out this relationship.    |
-| failure | FlowFiles that encounter IO or network errors are send out this relationship. |
+| failure | FlowFiles that encounter IO or network errors are sent out this relationship. |
 
 ## QueryDatabaseTable
 
diff --git a/README.md b/README.md
index 0f6a5ab61..dc56009b3 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
 
 The following table lists the base set of processors.
 
-| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
-|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
-| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
+| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
+|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
+| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
@@ -588,4 +588,3 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-
diff --git a/cmake/Asio.cmake b/cmake/Asio.cmake
index 1953c765c..14c8f9592 100644
--- a/cmake/Asio.cmake
+++ b/cmake/Asio.cmake
@@ -18,8 +18,8 @@
 include(FetchContent)
 
 FetchContent_Declare(asio
-        URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-22-1.tar.gz
-        URL_HASH SHA256=30cb54a5de5e465d10ec0c2026d6b5917f5e89fffabdbabeb1475846fc9a2cf0)
+        URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-24-0.tar.gz
+        URL_HASH SHA256=cbcaaba0f66722787b1a7c33afe1befb3a012b5af3ad7da7ff0f6b8c9b7a8a5b)
 
 FetchContent_GetProperties(asio)
 if(NOT asio_POPULATED)
diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp
new file mode 100644
index 000000000..92b8b020b
--- /dev/null
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms)
+    idle_connection_expiration_ = idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  ssl_context_service_.reset();
+  if (context->getProperty(SSLContextService.getName(), context_name) && !IsNullOrEmpty(context_name)) {
+    if (auto controller_service = context->getControllerService(context_name)) {
+      ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+      if (!ssl_context_service_)
+        logger_->log_error("%s is not a SSL Context Service", context_name);
+    } else {
+      logger_->log_error("Invalid controller service: %s", context_name);
+    }
+  }
+
+  delimiter_ = utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const std::byte>());
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public ConnectionHandlerBase {
+ public:
+  ConnectionHandler(detail::ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override {
+    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(std::error_code error_code, SocketType* socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(std::error_code error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(std::error_code error,
+                       const tcp::resolver::results_type::iterator& endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(std::error_code error,
+                   std::size_t bytes_written,
+                   const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); }
+
+  detail::ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const std::shared_ptr<SocketType>& socket,
+                                                           const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                           const std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, SocketType* socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](std::error_code error_code) { checkDeadline(error_code, socket); });
+    socket->lowest_layer().close();
+  }
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator endpoint_iter, const std::shared_ptr<SocketType>& socket) {
+  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
+    logger_->log_trace("No more endpoints to try");
+    deadline_.cancel();
+    return;
+  }
+
+  last_error_.clear();
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
+      [&socket, endpoint_iter, this](std::error_code err) {
+        handleConnect(err, endpoint_iter, socket);
+      });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
+                                                  tcp::resolver::results_type::iterator endpoint_iter,
+                                                  const std::shared_ptr<SocketType>& socket) {
+  bool connection_failed_before_deadline = error.operator bool();
+  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (connection_failed_due_to_deadline) {
+    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " timed out";
+    socket->lowest_layer().close();
+    return startConnect(++endpoint_iter, socket);
+  }
+
+  if (connection_failed_before_deadline) {
+    core::logging::LOG_TRACE(logger_) << "Connecting to " << endpoint_iter->endpoint() << " failed due to " << error.message();
+    last_error_ = error;
+    socket->lowest_layer().close();
+    return startConnect(++endpoint_iter, socket);
+  }
+
+  if (max_size_of_socket_send_buffer_)
+    socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+
+  handleConnectionSuccess(endpoint_iter, socket);
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
+                                                    const tcp::resolver::results_type::iterator&,
+                                                    const std::shared_ptr<SocketType>&) {
+  throw std::invalid_argument("Handshake called without SSL");
+}
+
+template<>
+void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
+                                                   const tcp::resolver::results_type::iterator& endpoint_iter,
+                                                   const std::shared_ptr<SslSocket>& socket) {
+  if (!error) {
+    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << endpoint_iter->endpoint();
+    deadline_.cancel();
+    return;
+  }
+  core::logging::LOG_TRACE(logger_) << "Handshake with " << endpoint_iter->endpoint() << " failed due to " << error.message();
+  last_error_ = error;
+  socket->lowest_layer().close();
+  startConnect(std::next(endpoint_iter), socket);
+}
+
+template<>
+void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
+                                                           const std::shared_ptr<TcpSocket>& socket) {
+  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
+  socket->lowest_layer().non_blocking(true);
+  deadline_.cancel();
+}
+
+template<>
+void ConnectionHandler<SslSocket>::handleConnectionSuccess(const tcp::resolver::results_type::iterator& endpoint_iter,
+                                                           const std::shared_ptr<SslSocket>& socket) {
+  core::logging::LOG_TRACE(logger_) << "Connected to " << endpoint_iter->endpoint();
+  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, endpoint_iter](const std::error_code handshake_error) {
+    handleHandshake(handshake_error, endpoint_iter, socket);
+  });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
+                                                std::size_t bytes_written,
+                                                const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                const std::vector<std::byte>& delimiter,
+                                                const std::shared_ptr<SocketType>& socket) {
+  bool write_failed_before_deadline = error.operator bool();
+  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (write_failed_due_to_deadline) {
+    logger_->log_trace("Writing flowfile to socket timed out");
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  if (write_failed_before_deadline) {
+    last_error_ = error;
+    logger_->log_trace("Writing flowfile to socket failed due to %s", error.message());
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written);
+  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
+    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code error, std::size_t bytes_written) {
+      handleDelimiterWrite(error, bytes_written, socket);
+    });
+  } else {
+    std::vector<std::byte> data_chunk;
+    data_chunk.resize(chunk_size);
+    gsl::span<std::byte> buffer{data_chunk};
+    size_t num_read = flow_file_content_stream->read(buffer);
+    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
+      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
+    });
+  }
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
+  bool write_failed_before_deadline = error.operator bool();
+  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (write_failed_due_to_deadline) {
+    logger_->log_trace("Writing delimiter to socket timed out");
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  if (write_failed_before_deadline) {
+    last_error_ = error;
+    logger_->log_trace("Writing delimiter to socket failed due to %s", error.message());
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", bytes_written);
+  deadline_.cancel();
+}
+
+
+template<>
+nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> ConnectionHandler<TcpSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
+  auto socket = std::make_shared<TcpSocket>(io_context_);
+  startConnect(resolved_query.begin(), socket);
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  return socket;
+}
+
+asio::ssl::context getSslContext(const auto& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
+  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
+  ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  if (auto cert_file = ssl_context_service->getCertificateFile(); !cert_file.empty())
+    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
+  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); !private_key_file.empty())
+    ssl_context.use_private_key_file(private_key_file, asio::ssl::context::pem);
+  ssl_context.set_password_callback([password = ssl_context_service->getPassphrase()](std::size_t&, asio::ssl::context_base::password_purpose&) { return password; });
+  return ssl_context;
+}
+
+template<>
+nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> ConnectionHandler<SslSocket>::establishConnection(const tcp::resolver::results_type& resolved_query) {
+  auto ssl_context = getSslContext(ssl_context_service_);
+  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
+  startConnect(resolved_query.begin(), socket);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  return socket;
+}
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> ConnectionHandler<SocketType>::sendDataToSocket(const std::shared_ptr<SocketType>& socket,
+                                                                                        const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                                                        const std::vector<std::byte>& delimiter) {
+  if (!socket || !socket->lowest_layer().is_open())
+    return nonstd::make_unexpected(asio::error::not_socket);
+
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.restart();
+
+  std::vector<std::byte> data_chunk;
+  data_chunk.resize(chunk_size);
+
+  gsl::span<std::byte> buffer{data_chunk};
+  size_t num_read = flow_file_content_stream->read(buffer);
+  logger_->log_trace("read %zu bytes from flowfile", num_read);
+  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const std::error_code err, std::size_t bytes_written) {
+    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, socket);
+  });
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  last_used_ = std::chrono::steady_clock::now();
+  return {};
+}
+
+template<class SocketType>
+nonstd::expected<tcp::resolver::results_type, std::error_code> ConnectionHandler<SocketType>::resolveHostname() {
+  tcp::resolver resolver(io_context_);
+  std::error_code error_code;
+  auto resolved_query = resolver.resolve(connection_id_.getHostname(), connection_id_.getPort(), error_code);
+  if (error_code)
+    return nonstd::make_unexpected(error_code);
+  return resolved_query;
+}
+}  // namespace
+
+void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* const session) {
+  gsl_Expects(context && session);
+
+  const auto flow_file = session->get();
+  if (!flow_file) {
+    yield();
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context->getProperty(Hostname, flow_file).value_or(std::string{});
+  auto port = context->getProperty(Port, flow_file).value_or(std::string{});
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[%s] invalid target endpoint: hostname: %s, port: %s", flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto flow_file_content_stream = session->getFlowFileContentStream(flow_file);
+  if (!flow_file_content_stream) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = detail::ConnectionId(std::move(hostname), std::move(port));
+  std::shared_ptr<ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_service_)
+      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, ssl_context_service_);
+    else
+      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, flow_file_content_stream, *session, flow_file);
+}
+
+void PutTCP::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+                             const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                             core::ProcessSession& session,
+                             const std::shared_ptr<core::FlowFile>& flow_file) {
+  auto result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+
+  if (!result && connection_handler->hasBeenUsed()) {
+    logger_->log_warn("%s with reused connection, retrying...", result.error().message());
+    connection_handler->reset();
+    result = connection_handler->sendData(flow_file_content_stream, delimiter_);
+  }
+
+  const auto transfer_to_success = [&session, &flow_file]() -> void {
+    session.transfer(flow_file, Success);
+  };
+
+  const auto transfer_to_failure = [&session, &flow_file, &logger = logger_, &connection_handler](std::error_code ec) -> void {
+    gsl_Expects(ec);
+    connection_handler->reset();
+    logger->log_error("%s", ec.message());
+    session.transfer(flow_file, Failure);
+  };
+
+  result | utils::map(transfer_to_success) | utils::orElse(transfer_to_failure);
+}
+
+REGISTER_RESOURCE(PutTCP, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h
new file mode 100644
index 000000000..4b8999e0f
--- /dev/null
+++ b/extensions/standard-processors/processors/PutTCP.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "io/InputStream.h"
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors::detail {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  [[nodiscard]] std::string_view getHostname() const { return hostname_; }
+  [[nodiscard]] std::string_view getPort() const { return port_; }
+
+ private:
+  std::string hostname_;
+  std::string port_;
+};
+}  // namespace org::apache::nifi::minifi::processors::detail
+
+namespace std {
+template <>
+struct hash<org::apache::nifi::minifi::processors::detail::ConnectionId> {
+  size_t operator()(const org::apache::nifi::minifi::processors::detail::ConnectionId& connection_id) const {
+    return org::apache::nifi::minifi::utils::hash_combine(
+        std::hash<std::string_view>{}(connection_id.getHostname()),
+        std::hash <std::string_view>{}(connection_id.getPort()));
+  }
+};
+}  // namespace std
+
+namespace org::apache::nifi::minifi::processors {
+class ConnectionHandlerBase {
+ public:
+  virtual ~ConnectionHandlerBase() = default;
+
+  [[nodiscard]] virtual bool hasBeenUsed() const = 0;
+  [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) const = 0;
+  virtual nonstd::expected<void, std::error_code> sendData(const std::shared_ptr<io::InputStream>& flow_file_content_stream, const std::vector<std::byte>& delimiter) = 0;
+  virtual void reset() = 0;
+};
+
+class PutTCP final : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+      "By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, "
+      "an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. "
+      "An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection "
+      "which is closed after the FlowFile has been sent.";
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property IdleConnectionExpiration;
+  EXTENSIONAPI static const core::Property Timeout;
+  EXTENSIONAPI static const core::Property ConnectionPerFlowFile;
+  EXTENSIONAPI static const core::Property OutgoingMessageDelimiter;
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property MaxSizeOfSocketSendBuffer;
+
+  static auto properties() { return std::array{Hostname, Port, IdleConnectionExpiration, Timeout, ConnectionPerFlowFile, OutgoingMessageDelimiter, SSLContextService, MaxSizeOfSocketSendBuffer}; }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  explicit PutTCP(const std::string& name, const utils::Identifier& uuid = {});
+  PutTCP(const PutTCP&) = delete;
+  PutTCP& operator=(const PutTCP&) = delete;
+  ~PutTCP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+ private:
+  void removeExpiredConnections();
+  void processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler,
+                       const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                       core::ProcessSession& session,
+                       const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::vector<std::byte> delimiter_;
+  std::optional<std::unordered_map<detail::ConnectionId, std::shared_ptr<ConnectionHandlerBase>>> connections_;
+  std::optional<std::chrono::milliseconds> idle_connection_expiration_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+  std::chrono::milliseconds timeout_ = std::chrono::seconds(15);
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutTCP>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 302af6c3a..1de8165c0 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -495,11 +495,11 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
   auto ssl_context_service = controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+      minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, "resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, "resources/localhost_by_A.pem")));
   LogTestController::getInstance().setTrace<ListenSyslog>();
   REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
@@ -508,8 +508,9 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService"));
   ssl_context_service->enable();
   controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_A.crt")));
+
   std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 247f2e6ff..e8c1f9286 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -119,11 +119,11 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
   LogTestController::getInstance().setTrace<ListenTCP>();
   const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::CACertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+      minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::ClientCertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, "resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::PrivateKey.getName(),
-    minifi::utils::file::concat_path(executable_dir, "resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, "resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, controllers::SSLContextService::Passphrase.getName(), "Password12"));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), std::to_string(PORT)));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::MaxBatchSize.getName(), "2"));
@@ -159,7 +159,7 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
     }
   }
 
@@ -190,14 +190,14 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
     controller.plan->scheduleProcessor(listen_tcp);
 
     minifi::utils::net::SslData ssl_data;
-    ssl_data.ca_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt";
-    ssl_data.cert_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/cert_and_private_key.pem";
-    ssl_data.key_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/cert_and_private_key.pem";
+    ssl_data.ca_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_A.crt";
+    ssl_data.cert_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/localhost_by_A.pem";
+    ssl_data.key_loc = minifi::utils::file::FileUtils::get_executable_dir() + "/resources/localhost_by_A.pem";
     ssl_data.key_pw = "Password12";
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data));
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_A.crt", ssl_data));
     }
   }
 
@@ -214,7 +214,7 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
     ssl_context_service->enable();
     controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_A.crt")));
   }
 
   ProcessorTriggerResult result;
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
new file mode 100644
index 000000000..6398d2bc0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -0,0 +1,517 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SessionType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SessionType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareSslServer::createSession %p", session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources" / "localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources" / "localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServers();
+  }
+
+  void stopServers() {
+    for (auto& [port, server] : listeners_) {
+      auto& listener = server.listener_;
+      auto& server_thread = server.server_thread_;
+      if (listener)
+        listener->stop();
+      if (server_thread.joinable())
+        server_thread.join();
+      listener.reset();
+    }
+  }
+
+  size_t getNumberOfActiveSessions(std::optional<uint16_t> port = std::nullopt) {
+    if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions() - 1;  // There is always one inactive session waiting for a new connection
+    }
+    return -1;
+  }
+
+  void closeActiveConnections() {
+    for (auto& [port, server] : listeners_) {
+      if (auto session_aware_listener = dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
+        session_aware_listener->closeSessions();
+      }
+    }
+    std::this_thread::sleep_for(200ms);
+  }
+
+  auto trigger(std::string_view message, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+    return controller_.trigger(message, std::move(input_flow_file_attributes));
+  }
+
+  auto getContent(const auto& flow_file) {
+    return controller_.plan->getContent(flow_file);
+  }
+
+  std::optional<utils::net::Message> tryDequeueReceivedMessage(std::optional<uint16_t> port = std::nullopt) {
+    auto timeout = 200ms;
+    auto interval = 10ms;
+
+    auto start_time = std::chrono::system_clock::now();
+    utils::net::Message result;
+    while (start_time + timeout > std::chrono::system_clock::now()) {
+      if (getListener(port)->tryDequeue(result))
+        return result;
+      std::this_thread::sleep_for(interval);
+    }
+    return std::nullopt;
+  }
+
+  void addSSLContextToPutTCP(const std::filesystem::path& ca_cert, const std::optional<std::filesystem::path>& client_cert_key) {
+    const std::filesystem::path ca_dir = std::filesystem::path(minifi::utils::file::FileUtils::get_executable_dir()) / "resources";
+    auto ssl_context_service_node = controller_.plan->addController("SSLContextService", "SSLContextService");
+    REQUIRE(controller_.plan->setProperty(ssl_context_service_node, SSLContextService::CACertificate.getName(), (ca_dir / ca_cert).string()));
+    if (client_cert_key) {
+      REQUIRE(controller_.plan->setProperty(ssl_context_service_node, SSLContextService::ClientCertificate.getName(), (ca_dir / *client_cert_key).string()));
+      REQUIRE(controller_.plan->setProperty(ssl_context_service_node, SSLContextService::PrivateKey.getName(), (ca_dir / *client_cert_key).string()));
+    }
+    ssl_context_service_node->enable();
+
+    put_tcp_->setProperty(PutTCP::SSLContextService, "SSLContextService");
+  }
+
+  void setHostname(const std::string& hostname) {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, PutTCP::Hostname.getName(), hostname));
+  }
+
+  void enableConnectionPerFlowFile() {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, PutTCP::ConnectionPerFlowFile.getName(), "true"));
+  }
+
+  void setIdleConnectionExpiration(const std::string& idle_connection_expiration_str) {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, PutTCP::IdleConnectionExpiration.getName(), idle_connection_expiration_str));
+  }
+
+  uint16_t addTCPServer() {
+    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
+    listeners_[port].startTCPServer(port);
+    return port;
+  }
+
+  uint16_t addSSLServer() {
+    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine_);
+    listeners_[port].startSSLServer(port);
+    return port;
+  }
+
+  void setPutTCPPort(uint16_t port) {
+    put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
+  }
+
+  void setPutTCPPort(std::string port_str) {
+    put_tcp_->setProperty(PutTCP::Port, std::move(port_str));
+  }
+
+  [[nodiscard]] uint16_t getSinglePort() const {
+    gsl_Expects(listeners_.size() == 1);
+    return listeners_.begin()->first;
+  }
+
+ private:
+  utils::net::Server* getListener(std::optional<uint16_t> port) {
+    if (!port)
+      port = getSinglePort();
+    return listeners_.at(*port).listener_.get();
+  }
+
+  const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
+  test::SingleProcessorTestController controller_{put_tcp_};
+
+  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing space before {  [whitespace/braces] [5]"
+  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
+
+  class Server {
+   public:
+    Server() = default;
+
+    void startTCPServer(uint16_t port) {
+      gsl_Expects(!listener_ && !server_thread_.joinable());
+      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, core::logging::LoggerFactory<utils::net::Server>::getLogger());
+      server_thread_ = std::thread([this]() { listener_->run(); });
+    }
+
+    void startSSLServer(uint16_t port) {
+      gsl_Expects(!listener_ && !server_thread_.joinable());
+      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
+                                                          port,
+                                                          core::logging::LoggerFactory<utils::net::Server>::getLogger(),
+                                                          createSslDataForServer(),
+                                                          utils::net::SslServer::ClientAuthOption::REQUIRED);
+      server_thread_ = std::thread([this]() { listener_->run(); });
+    }
+
+    std::unique_ptr<utils::net::Server> listener_;
+    std::thread server_thread_;
+  };
+  std::unordered_map<uint16_t, Server> listeners_;
+};
+
+void trigger_expect_success(PutTCPTestFixture& test_fixture, const std::string_view message, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+  const auto result = test_fixture.trigger(message, std::move(input_flow_file_attributes));
+  const auto& success_flow_files = result.at(PutTCP::Success);
+  CHECK(success_flow_files.size() == 1);
+  CHECK(result.at(PutTCP::Failure).empty());
+  if (!success_flow_files.empty())
+    CHECK(test_fixture.getContent(success_flow_files[0]) == message);
+}
+
+void trigger_expect_failure(PutTCPTestFixture& test_fixture, const std::string_view message) {
+  const auto result = test_fixture.trigger(message);
+  const auto &failure_flow_files = result.at(PutTCP::Failure);
+  CHECK(failure_flow_files.size() == 1);
+  CHECK(result.at(PutTCP::Success).empty());
+  if (!failure_flow_files.empty())
+    CHECK(test_fixture.getContent(failure_flow_files[0]) == message);
+}
+
+void receive_success(PutTCPTestFixture& test_fixture, const std::string_view expected_message, std::optional<uint16_t> port = std::nullopt) {
+  auto received_message = test_fixture.tryDequeueReceivedMessage(port);
+  CHECK(received_message);
+  if (received_message) {
+    CHECK(received_message->message_data == expected_message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::TCP);
+    CHECK(!received_message->sender_address.to_string().empty());
+  }
+}
+
+constexpr std::string_view first_message = "message 1";
+constexpr std::string_view second_message = "message 22";
+constexpr std::string_view third_message = "message 333";
+constexpr std::string_view fourth_message = "message 4444";
+constexpr std::string_view fifth_message = "message 55555";
+constexpr std::string_view sixth_message = "message 666666";
+
+TEST_CASE("Server closes in-use socket", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+
+  trigger_expect_success(test_fixture, first_message);
+  trigger_expect_success(test_fixture, second_message);
+  trigger_expect_success(test_fixture, third_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+  receive_success(test_fixture, third_message);
+
+  CHECK(1 == test_fixture.getNumberOfActiveSessions());
+
+  test_fixture.closeActiveConnections();
+
+  trigger_expect_success(test_fixture, fourth_message);
+  trigger_expect_success(test_fixture, fifth_message);
+  trigger_expect_success(test_fixture, sixth_message);
+
+  test_fixture.tryDequeueReceivedMessage();
+
+  CHECK(LogTestController::getInstance().matchesRegex("warning.*with reused connection, retrying"));
+  CHECK(2 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("Connection per flow file", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+
+  test_fixture.enableConnectionPerFlowFile();
+
+  trigger_expect_success(test_fixture, first_message);
+  trigger_expect_success(test_fixture, second_message);
+  trigger_expect_success(test_fixture, third_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+  receive_success(test_fixture, third_message);
+
+  trigger_expect_success(test_fixture, fourth_message);
+  trigger_expect_success(test_fixture, fifth_message);
+  trigger_expect_success(test_fixture, sixth_message);
+
+  receive_success(test_fixture, fourth_message);
+  receive_success(test_fixture, fifth_message);
+  receive_success(test_fixture, sixth_message);
+
+  CHECK(6 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("PutTCP test invalid host", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+
+  test_fixture.setPutTCPPort(1235);
+  test_fixture.setHostname("invalid_hostname");
+  trigger_expect_failure(test_fixture, "message for invalid host");
+
+  CHECK((LogTestController::getInstance().contains("Host not found", 0ms)
+      || LogTestController::getInstance().contains("No such host is known", 0ms)));
+}
+
+TEST_CASE("PutTCP test invalid server", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+  test_fixture.setPutTCPPort(1235);
+  test_fixture.setHostname("localhost");
+  trigger_expect_failure(test_fixture, "message for invalid server");
+
+  CHECK((LogTestController::getInstance().contains("Connection refused", 0ms)
+      || LogTestController::getInstance().contains("No connection could be made because the target machine actively refused it", 0ms)
+      || LogTestController::getInstance().contains("A connection attempt failed because the connected party did not properly respond", 0ms)));
+}
+
+TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+  test_fixture.setHostname("192.168.255.255");
+  test_fixture.setPutTCPPort(1235);
+  trigger_expect_failure(test_fixture, "message for non-routable server");
+
+  CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
+    || LogTestController::getInstance().contains("Operation timed out", 0ms)
+    || LogTestController::getInstance().contains("host has failed to respond", 0ms)));
+}
+
+TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  test_fixture.addSSLContextToPutTCP("ca_B.crt", "alice_by_B.pem");
+  test_fixture.setHostname("localhost");
+  auto port = test_fixture.addSSLServer();
+  test_fixture.setPutTCPPort(port);
+
+  trigger_expect_failure(test_fixture, "message for invalid-cert server");
+
+  CHECK((LogTestController::getInstance().contains("certificate verify failed", 0ms)
+      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));
+}
+
+TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  test_fixture.addSSLContextToPutTCP("ca_A.crt", std::nullopt);
+  test_fixture.setHostname("localhost");
+  auto port = test_fixture.addSSLServer();
+  test_fixture.setPutTCPPort(port);
+
+  trigger_expect_failure(test_fixture, "message for invalid-cert server");
+
+  CHECK((LogTestController::getInstance().contains("sslv3 alert handshake failure", 0ms)
+      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));}
+
+TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+
+  test_fixture.setIdleConnectionExpiration("100ms");
+  trigger_expect_success(test_fixture, first_message);
+  std::this_thread::sleep_for(110ms);
+  trigger_expect_success(test_fixture, second_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+
+  CHECK(2 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("PutTCP test long flow file chunked sending", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  std::string long_message(3500, 'a');
+  trigger_expect_success(test_fixture, long_message);
+  receive_success(test_fixture, long_message);
+}
+
+TEST_CASE("PutTCP test multiple servers", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  size_t number_of_servers = 5;
+  std::vector<uint16_t> ports;
+  SECTION("No SSL") {
+    for (size_t i = 0; i < number_of_servers; ++i) {
+      ports.push_back(test_fixture.addTCPServer());
+    }
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    for (size_t i = 0; i < number_of_servers; ++i) {
+      ports.push_back(test_fixture.addSSLServer());
+    }
+  }
+
+  test_fixture.setPutTCPPort("${tcp_port}");
+
+  for (auto i = 0; i < 3; ++i) {
+    for (auto& port : ports) {
+      std::string message = "Test message ";
+      message.append(std::to_string(port));
+      trigger_expect_success(test_fixture, message, {{"tcp_port", std::to_string(port)}});
+      receive_success(test_fixture, message, port);
+    }
+  }
+  for (auto& port : ports) {
+    CHECK(1 == test_fixture.getNumberOfActiveSessions(port));
+  }
+}
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index fd313d928..ca63f5da5 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -57,7 +57,7 @@ TEST_CASE("PutUDP", "[putudp]") {
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
   put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+  utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
   auto cleanup_server = gsl::finally([&]{
diff --git a/extensions/standard-processors/tests/unit/resources/alice_by_A.pem b/extensions/standard-processors/tests/unit/resources/alice_by_A.pem
new file mode 100644
index 000000000..605fe9366
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/alice_by_A.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAqhZR+Hsx397YYy2sQOI0IkxO6rkJvftLrjRpy1YwVfArimkU
+umZkWdpE7FAt1BIkzBlSsqXzeY/+W53YjOBcLK+xrQHpIquTGG6iL4btM6hWPBow
+hxuz0TpW5SpsuupQbRi4hbWVuQzTCKV68VM01/590Su2l0MPoamSMthK8H2ubodz
+R1VFwlTaLZmRJ20hyowsuKLOdc8fyzDXH5JPR9+STHsPpl+OccDvTG8iKlOZMa8z
+d3GXWBhSfcPgP+WzJWLn1bVN7UbKfgneUYSRAvf+ocsT1OZ7T+eam07ROsZBgpN1
+VVycmFalRqsNddT814tUIgkRXEsXzC1bP/eV2QIDAQABAoIBAH7O50xHpRaQkWnY
+Gm3BeDb+B3ROgqnm2jTGFP4pgx3/Uqb90xtpzXWEGxDIcnKDGHYmhxZ0TYMbTPtH
+QrU9bNtQHjqriwJzQtbbXQXsJZr27Vwf9oA0sirSwQhYSfpNSasc3C2sBTWTDx+K
+KJAVhfdnYKx7V8WMlPHld/96bNzA0AqGgn8FGYDiPAdiY7Ega6/iEMgtwGgIe56c
+k5YeaXOeV9b5gZGFZyDXcnbAgC24gcuSI70YmoYgSBYJC1NftLEa4NLysetpEA2A
+de1kQxm54ZfeeC+whtT693jIvp8cg6Ck/yCNj+qGXFbBWjibojs/uN8PadBOl8DU
+hdGrG2UCgYEA0tCiEfaMykaZn4GTmUq+drx2l5eP6GBhmkCpw5b0AS7Xpi/sh1Hm
++v36+ffVdPsYylVDMCCLxrugx+pkCwk2I+fHxRJ0tGGOCBclpQqE0mgYQdBmGyMQ
+hsXea+9IhbbeGkqgxxHWGHzVPtq7NFOLSt6LGF6+RXhfHcaBq/ypFccCgYEAzor2
+nhQr0q6UpaVUmZp3fliVfhsv7qQEcsCXbAssLbKM36vyD8m9A6V94QLfL/Z6jBx7
+3Edh18OrN0dfIlcF3J8jiD/3mGiGGldsF/dgJWQB8DpP51tYldR7ni1bW+ZDtlX7
+XmcWKTGZLqXqzYS8bsCcKUOLd2g//p1mnbcfd18CgYA8cG4Wok3I7Ca88SREzYX/
+epaxbVVntMImvCUvmwaHlEtlLNYuEZAcI1ah9aiv6hE4aOtjT+Fi74Xv6sYV1+U6
+tAe0+06ULGfQ7/nt8C8WN5vEup+bZhkl2nKjFS4Aj+XrObwQdo+f46IrbABBxzXn
+GBheu0LnndP/MFsa2MwNHwKBgEmAFkcm8nlk+yz/at3GpGNn7rsTvbj00Uhs1PXz
+++K/OXaXX+rSZdsYV3VtajNNSUr3D/TRyjXYQePIGEjGIyXh0+k2qkuoVqClH6hf
+te1Ya4AroCe60AlxthQSHALWLJ6EdpGfqbk7F0IMdURxygS3slrU2JrDlJJtPQk/
+E4mNAoGAJjht0RIhJLr+Gss+7SocEYFd8klFpRTWpx80pN5hRdUfH84Q3OOXnq2F
+QoHD7WLMM8Ec/paSSZjrmvk00Ptp4s2/Z1SRhY2BQjbk32xP0/CkGGK8SPVW6Sb7
+hAol6soYGroGcCGsRPdRE6hF5+BH8VYGh5vPlELyDnNym7Kp0wU=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDA8Dkntpi2PSSJDZGYjjG03qNboMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEEwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWFsaWNlLmNvbTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKoWUfh7Md/e2GMtrEDiNCJMTuq5
+Cb37S640actWMFXwK4ppFLpmZFnaROxQLdQSJMwZUrKl83mP/lud2IzgXCyvsa0B
+6SKrkxhuoi+G7TOoVjwaMIcbs9E6VuUqbLrqUG0YuIW1lbkM0wilevFTNNf+fdEr
+tpdDD6GpkjLYSvB9rm6Hc0dVRcJU2i2ZkSdtIcqMLLiiznXPH8sw1x+ST0ffkkx7
+D6ZfjnHA70xvIipTmTGvM3dxl1gYUn3D4D/lsyVi59W1Te1Gyn4J3lGEkQL3/qHL
+E9Tme0/nmptO0TrGQYKTdVVcnJhWpUarDXXU/NeLVCIJEVxLF8wtWz/3ldkCAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAR2vpt91QSLfoh0qIW+bknV+ZilZdgRGh+kXm
+deqo+Drkz3BgmbXCIG6GGWF6LaS+iNt5YYyHUBKqLkvAfwtocLSVgNKYcgqG3kLZ
+qfoLrlT/IhHQ7WE6NOFQKcoJ/vuBMU7zjROjbbw2NdkO7hpJr2NQC5CgfPy89eJ6
+ly7wf3zxsVHk8fUnl1MgSb4lft4v5E73s9SpfRkKYr2BrkMCHQYawRAm9um9pW2S
+Qmk1L6OKkkCoR+LYrLyWY3s84NGVjP/fk7XHtvh2YtlB3sT4/yluOc8kXTp0DiuW
+UyvWTGYI+hHvZ4ol8LOttlV4Nwo4d8qgyuHlgiw1dnQBUZB7/g==
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/alice_by_B.pem b/extensions/standard-processors/tests/unit/resources/alice_by_B.pem
new file mode 100644
index 000000000..f8e1328e0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/alice_by_B.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAqq6fA21gSE4zJvfEwJ2hTa0tTfONsWLIfho0jNKvTytwoD12
+tj41mPdM8kWvYY/oEPQ8WCXAy4UmgwxxWvCmtDmm/NXT5B6/IMr+png82sdkPABY
+qoPxIb0lvRoSQ0DIYZhRLI1v6TdAFMHUVyksSihoaDZZ7/6Ne+0p0xHTqiyTrlgX
+axMCvmS1S/i6eCUfNKtBA3Y0nXLj1D24RYn7gxPKnAXMpCwoxW1+zRFA9SotyaO4
+mh3cp4MFALxdimu69q5/rduK9XlAXvqxlCb80mJ3he4TEio2J2KL+kiFksGKtj7W
+e8a6UgIzOYYTmyZ4ScYGnwvSxYeAkQX0oRPzbwIDAQABAoIBAA5aRsbcALez07tY
+JHRqDPFiOagPbf/XpbJs87RP1ywaJAtlf8ENdCZbzV2mHHxgxIwAbb6f1hmHJdjp
+R/L0v5/yJSent3y8VSglycon3D4tfDFLeilElRdYN38yXQzIutDyJQlRD3MWEU5t
+ijSWIsJNqZHx2BhFWJJuPBEis6DgoZXUIQLMCNDUPTofJS486yoYxdLhMC+i4+6q
+DnQ+k7a32tPhSJGKXVCeIbGA2gJFcxoRQvFuJuMCofCC1jJ1BlGKbaeal0RC9ICe
+TSk+SPmZMGMiOcLXBcJTCRxQP71mqtxn4cYJdvQDmXP+vZyVWBXB272YRwSoTbTL
+yKHsn4ECgYEA0/1V4tuOCfCP94Phk0zwaepeZxeGdRTQhum57V5L3xmlgkrrUx+5
+u6JzrMrxpO8h7l619jHPPaV0u0BE1bwjGeOPuPA9WCWqfzETDL/j0NbTOWRLnX5h
+91Ag8BYoGxt4gK856S/Me7PRPljNWiv4bDjGjXrYq/GDpSU9ZPnW9y8CgYEAzh3o
+Gz9XGEI9EjcwCZtuPoKmalvXFC2tNNkkX6FgZRire0mekJzKjbNgR7xrWYtUaZ55
+ZTEGL9bseRSOcfWeZBpJIclEIgyajYs8tW8RaTZoRQmu3fP+2IlFoU9SiSQR40hX
+7eeJOduHhbxXGW/JKuf7pBbGMSTH9n+MqfV/t8ECgYEAhEmmDABYvekp3hqlbOdp
+a57+tDShCnUnv9kg1niuvhViDFG2UlQM8oNozh6C9xrnQLpHsM/adKzIkIWFrx9N
+hD1WleENVvGCWQcFzUH953f3revhp/GTLuMI+unIs0nMQ/mVGOhkIZnP7Kk71JZ1
+2wr/FJDhn0MClM8NZfLm668CgYAYkbANL2uuVJb7COENBB4MDX7QxsnIeflfh1Ky
+o4XeBybJt2jTTB1I7szXQDp7ngQd4uoNid527WOauzyPkPukaw20nU0l0eLKZIKE
+Dg1BQV8Ee7cAdgk2voYySEZKWqZXNVRl42eUIfrxkhW/zndoSebRFHXjfcLoOyQF
+TmI/AQKBgEKHOHbemvkKtdE4GPz436Y9vodqOMe2qFAoW+nhOP4stGZhZpOyVMvk
+Y7PxpG+1bwPVg4ouSJyoq9Mw3Qnz5Reuot1h67rrw75CyUSDwuAdLpnS08t7T7LF
+N7b5nWOMcbUdaCaiAh3Mv8/9vo46ZdpdiFYgDLOCYiiLifva36rK
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDYWRAThkKd/J+oMW7tZBqEPx4XPMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEIwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWFsaWNlLmNvbTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKqunwNtYEhOMyb3xMCdoU2tLU3z
+jbFiyH4aNIzSr08rcKA9drY+NZj3TPJFr2GP6BD0PFglwMuFJoMMcVrwprQ5pvzV
+0+QevyDK/qZ4PNrHZDwAWKqD8SG9Jb0aEkNAyGGYUSyNb+k3QBTB1FcpLEooaGg2
+We/+jXvtKdMR06osk65YF2sTAr5ktUv4unglHzSrQQN2NJ1y49Q9uEWJ+4MTypwF
+zKQsKMVtfs0RQPUqLcmjuJod3KeDBQC8XYpruvauf63bivV5QF76sZQm/NJid4Xu
+ExIqNidii/pIhZLBirY+1nvGulICMzmGE5smeEnGBp8L0sWHgJEF9KET828CAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAgc0RbVwCNVpCZjUyhVxBlqrS1S0K8ygdyVPG
+7/fcejKSA7aUEA4x5pehvNwhDHXnW9jiEdWbQLyJaNFyuQT/4R8tCZi0q6nQF7NN
+shL0B19QaHErSPHYudecshbB7VrsiYjG9Q3O8QMrulfLcz3b6RLqUTLCOSK7Nclk
+Nv+ONad80OCzjBUIOnIHzkfDRDChzsF90EGtyLtIXaUO/K7WZDlw6+Gf9rVtyH6S
+USyUzcKVDobxCcJkmlRbmSL3oExTQCukhH6aNhUyvUFtlaKEXqizFe+/ujtE0Ymg
+sAJWmEYct+9H6iINgq24kPn/h29EbvYcGWIEg20U8+AznHaMPQ==
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_A.crt b/extensions/standard-processors/tests/unit/resources/ca_A.crt
new file mode 100644
index 000000000..3d283a6c0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/ca_A.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDizCCAnOgAwIBAgIUca6kHRI3RSvFxnz4ksg2M33A3IowDQYJKoZIhvcNAQEL
+BQAwVDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxl
+LCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZpY2F0ZSBBdXRob3JpdHkgQTAgFw0yMjA5
+MTkwOTA1MzZaGA8yMDUyMDkxMTA5MDUzNlowVDELMAkGA1UEBhMCVVMxCzAJBgNV
+BAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxlLCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZp
+Y2F0ZSBBdXRob3JpdHkgQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AKsd1Yd8ds19WNU9ag6oDfgyxGzfnm4Nl8SrG34nA9D3DB7yXu3yN2feVPBNqDCU
+FnOCsQqiRKOROfNPEfz8fqqOqUcd8TK1RNg3JWtbOjy+BklBqm8NK3fdDkrD+Fuq
+sKOdqho5Xuy36Ec4y4citEW7FcRdu9LrAr81NbcOG0AU7a+SRdiROVUmSIDhQhDP
+j37HO9Rya6DizNSTIvQ4xQ/iQTzGqdZD9wy/AUQt+E7VrTslpIi48dWSjM6mZkGA
+1TcfAeDjJa7HrbnIZkvRhH5tUiHzCbQq+8N5SkSFssP8wd++8rydD0gWjxkOIHtR
+SGPoq5cp5uKAq4j7DXasneECAwEAAaNTMFEwHQYDVR0OBBYEFH6swKPx3nUFLoaW
+WiiwhERbyC5AMB8GA1UdIwQYMBaAFH6swKPx3nUFLoaWWiiwhERbyC5AMA8GA1Ud
+EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBACepoYKN+U5mqP5R6s/6/CM4
+3iBgcVRwAWNLd+cMhGzbSMvQbwji9AvE4lUxoLULIRl9EeedlKuEv01Eic4RGMq4
+1hG8mn3mSjITqQKYS+2o3sIKqtnfR86uQzQyixTTGiKJzpPV6vzQgtvkniCTPlgI
+eu59pNfQLUlYrgtJ+lTv/2/MPyS2I137DsjG+7ASVbDZ6uDbEp1/KyrgJB1skB+6
+s2Pxicf9X8mpfpuTqFiGyJUOdHmgYpx6ZxyAgMCm4C+a5e8I283d0xX06coChy00
+fh23THQ9O8HVQYejzHFfCoshIkj9l0Kkw6Um2aS4KLaZxAky+Kn+wgqFbgcJY0s=
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_B.crt b/extensions/standard-processors/tests/unit/resources/ca_B.crt
new file mode 100644
index 000000000..5beee660a
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/ca_B.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDizCCAnOgAwIBAgIUIRojQbIHUpmTeT1hp7BsxG8gFDswDQYJKoZIhvcNAQEL
+BQAwVDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxl
+LCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZpY2F0ZSBBdXRob3JpdHkgQjAgFw0yMjA5
+MTkwOTA1MzZaGA8yMDUyMDkxMTA5MDUzNlowVDELMAkGA1UEBhMCVVMxCzAJBgNV
+BAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxlLCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZp
+Y2F0ZSBBdXRob3JpdHkgQjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AJfR48sziDgKH8+PfcVUMWag7ctGoBXgfOZ8h5gubQm5KrTD0rqHvf/8dLGvJ0aq
+tKVYnoFFjHikDIJZxuMYF6Vbq39FNinZugMCQsJ3gTWREq7tr3MLDfN+lD9rCxAr
+RDbfwaXN907ljXbsNoq3km9Bd43qAxDDND5N74o9wefFLLxcNo08d5aTN3LZY9g0
+b83ps+kc9Ysm9JBzFN10DJYIWwRWvCZL6hX10fWqrV9OcJgilCQk5PJgaZBppQgi
+hiTjq36vlBCTL3RO2MXecPSJLfigwKkT4WZwrG1E26jhh0lGVK6pdOs99JeTtzfr
+hC4lR8ExD8wFwvcn/8jFXxsCAwEAAaNTMFEwHQYDVR0OBBYEFDDrW/6QNEfo/7Br
+fpz8Vmm7KjWvMB8GA1UdIwQYMBaAFDDrW/6QNEfo/7Brfpz8Vmm7KjWvMA8GA1Ud
+EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAErDWgtX/547fwHv6+sOoSGT
+FHmVXba0IlcfW404r1Kp0MeNYZitQvi4SHL6CkxY+/RdgppxkbE5D8hUhk9PrSeo
+5E4ebPsnvSRlzDVWLm2ZpbLRO6BcrzbEN0b3ylZ3Kw/+SarDNBMSTDWZ5knWXUk6
+3Ckg2gg8VCLKxQK0IDTHtXq+WTKGmVf34dYbLfWHUnYr1DLUsxgnX4llHm3xOrzp
+ZqvW5cEdlj6+SW1azQgbFrEeWH7ebK5E4GBQ8LhRWbIpo6g2kzaGKTkijrk9agMs
+ByzjRdLitbwt07VNE9cNDVv0kC1PLZcz1TgNnaOl5CABqw0yMjLO5LEXUK4BYkU=
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_cert.crt b/extensions/standard-processors/tests/unit/resources/ca_cert.crt
deleted file mode 100644
index e801553c9..000000000
--- a/extensions/standard-processors/tests/unit/resources/ca_cert.crt
+++ /dev/null
@@ -1,20 +0,0 @@
------BEGIN CERTIFICATE-----
-MIIDNDCCAhwCCQDoXhDkdH/BBjANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV
-UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM
-H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz
-WhcNNDkwNzA0MDk1MjUzWjBcMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU
-BgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMMH0dvb2QgUm9vdCBDZXJ0aWZp
-Y2F0ZSBBdXRob3JpdHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG
-t5qp++4NAO83uASsVx7xRc3YS1Ss6La2opJTeXSnnsL6d+eLIUZrO6R/vofjLMPb
-qHisnQXAtl560d/XPBXm/ydp2IBLJQJW9aRxa/zqcf4tDTdBLKXYHhqKSQDJGS78
-vOuNuhf6T+p1guqnLYxwlRp6V8DMY/nC5n+IgByr9Jp2QtqJceH5WdyABVauqtMo
-LKXdbhfU6lDZ1XIZNeoKY8u2s34UQLUvOGaP/FzYHvKev1KzFF/nR3+svK8cvxXM
-EuqHM5tdtIp1ugjvR66PUIT00HoT00wS6VIpBdHq/8uXJeY77lr52xyVdk282tlw
-wr9/W0AGXjVMW3O+VRhFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAE5HYjHkh3fI
-qakhENGL5PSszmOz5yQRrggP2ZJeEAoFjy5fbf/zUPIPMgMa0qM4QI+2C0iGlem1
-c1MCGNk5BiDPWMaUjppYmPZWkXzYu9Nl1dizXYidcnTiiBTROkpMij2fzCErymx9
-CmYxfeFyeJ5uAHSWSOGCfvlxi0vHvHn/+5rm0eqHcGP2c9ivW/SC/6RCXnHuIS9O
-O/UHrQPQe7YmdBgCHw4K4UHLZkYPH6osMPdII09PbZBB1TgrogbuA6TMp9NU6LrX
-WNN3nhFaVVjEb8tawMabfG9PU/1PKGRuNdaLsYb3IXhT0I/SWobD3MJ9xcO9sAhv
-QKZuUQf4ntI=
------END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem b/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem
deleted file mode 100644
index e4891c58b..000000000
--- a/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem
+++ /dev/null
@@ -1,46 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIEpQIBAAKCAQEAtdEk8MHL8RFiOuHz/hfyjEe3PxumlNnubzn7u6EdJpiLhOLs
-cWcvQAbYvbsyI6duGzKh1FRNmS8+Q/gp/36vmssZy96B4K+axiG7WBqHfjPJbjfa
-NHbz3D7D7i36r/Vu/pXsBMPb/DSJ978maY3oikSB9906bx73XJHAx3RLxgLHl4po
-vt20SDOY9R2Klcbew4pDXuUpRdEK0h1+fdZkYU92YiAz2O86eYMvwgorCX9tBrcw
-sih/cYYiTQ9DMZ3DPJ0HfDH5T5gIPH/+5/YVHaTBdthhIZ13UNY/X8XcOVO/nlzC
-3/MtBiTxKj4zIyqncbClm4BqzW3S5hxUQXR32QIDAQABAoIBAAXoCmwrz4VATFGf
-X37EpmN6PPC25D13qvBAEPZycHD9iaLCgG3arUVGM6pON33DBaeqiGlOZ8rvJvWs
-TSj4o5nCuU7PJqb27W88T0q4aehmpEeJVvRXXOqtu020fq1Sqs1ob2dkOXRC/Kxo
-sEXDj2dWfGZh8HEFr4F5VqrkE0YWaQLaNHf9g6vAuOtlNMnhu5iM7mNq0qQi4Qg0
-zmOpEyAK5obhPEa8eYgjuWUeuul342wpMEGaFqD3lr4rnhcESZtm/S37L6lJ24UF
-SIBPzuEjEDlthlh3tqgKyQFsHvcMN4XN4850J/nMoX8jDcvnV7iYYFykHEb6y6FZ
-+ZlftNECgYEA2hS+s7yer2bQGw3LsKqwpNmpckLeAb84JEbra6i7A7SPUMEYwjOm
-Q2ePpz6ZBVDs7qODBgMV2g9a9GCbSzgV7SeQ0367dCPugONckVQvMVU5wPtSkFKF
-8jLn66+6YK64ASTqLLVd8TkU3bdByWcsh3JTR/lmDwlSeGjbm1OETj0CgYEA1W41
-Pi8ZbGDrpc9/CnyFCLMaipq9cAm4n/M58CVf0ogxeAXShIcDboTfv7lqpnqM2vg7
-sSRjyHz0++5VZTNSnQDlLIndQHQ6NKKC1tb0zNKlRuL2gwMHwMmWLqCjbLsqSP5E
-lHEM8fn2EVAMiKRod01kOY7OnUnPSSgMD7QvJc0CgYEAyWqZi0WtRhDuKd5+/0dW
-6JqDrp1lkDV887xwmLl5KH3uU8ZUSKENcXnHqs7c45UPj4SDcd0NpJ3EAqrrIvjE
-/4kocL2/AhBhqrbS+wLGp4iwU7WLVvJw9fXgT8S4na0hEyV2Bx7nifCPfgtQfmSF
-Mv/7PSFyCncwrTcjhP0I2H0CgYEAkTNbEaUlXLBLYRDbUx0HvLVstyMzAgf7DQaC
-QjiLCkYRsZ/0aqkX0pafSmYwgnYZYddDdO5W3Ez2tnacri7OY3X6c+SPG4x3FNwC
-u3qeLMKaIrHCF7t2CNicTbiHti9XQzWJHpwSvITbvUeCX2vKjm+eYfIf6q4OUazn
-F7/z23kCgYEA0r2N20DkEN9uD281SZbjAmkjYMMvUkDyChk2oWnODZcFkEhuaGM9
-0TBw1vjpoO67H/tSNEUziXMNLtnH4p5gRzP84ZGpMPxe4MK6pIwbDxhDyPlLCwOG
-dYnaYyWqsiMbqtv6LbMh/uatuMpCzqJEQZz80xDNUp3k90muk3+0M4w=
------END RSA PRIVATE KEY-----
------BEGIN CERTIFICATE-----
-MIIDJTCCAg0CCQDclmfqcI6z1DANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV
-UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM
-H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz
-WhcNMjkwNzA5MDk1MjUzWjBNMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU
-BgNVBAoMDUV4YW1wbGUsIEluYy4xGTAXBgNVBAMMEGdvb2QuZXhhbXBsZS5jb20w
-ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC10STwwcvxEWI64fP+F/KM
-R7c/G6aU2e5vOfu7oR0mmIuE4uxxZy9ABti9uzIjp24bMqHUVE2ZLz5D+Cn/fq+a
-yxnL3oHgr5rGIbtYGod+M8luN9o0dvPcPsPuLfqv9W7+lewEw9v8NIn3vyZpjeiK
-RIH33TpvHvdckcDHdEvGAseXimi+3bRIM5j1HYqVxt7DikNe5SlF0QrSHX591mRh
-T3ZiIDPY7zp5gy/CCisJf20GtzCyKH9xhiJND0MxncM8nQd8MflPmAg8f/7n9hUd
-pMF22GEhnXdQ1j9fxdw5U7+eXMLf8y0GJPEqPjMjKqdxsKWbgGrNbdLmHFRBdHfZ
-AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAIh6k/epw3dWtRuMwXxjqEobi/RD/8Nk
-52kX6x8WTcnglrSzPSvkhnfR5PQ9whY2Zbw0aVdenejlGZEi8cAxwmJbN4NIhQwW
-FjHYYQA0MPgFGq/4XFT9E49aS212+ivUBRoPlWfw7QmCdGq3z6eQGfVtIGGLfSGH
-cvnC9Z4VdY0RJrnzgRKd7iq/RW66u3Uyg1fdOKCp9F5PSwwl+6dPgKO84muWjRi4
-9y+htcXSboEtYQy/ncul0MeJ8fGTY1YEG2QUolmCKeJ8a2e6SHcX0Unu+6tAD8sK
-fjpZAOI1lgRrhrIKhi3Rx0aCkhayhvvScDQL0ODA5ciCu5EJHRhWCbg=
------END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem b/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem
new file mode 100644
index 000000000..a40251478
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEA1/8IgNpuq16MNaqaHNSCucyl0NbIFavnX+nOl+3Yu9/lHvQ6
+HY4PEU1Ma9tDOL2VON1PrFx+tJ1CTY6RDn5Ppj6pOpIeIxx1sGkxvJVeMhsmwojB
+3jQUJE6rgrUvsUr1YzvbJwwtfgr+PJD4uxS8V58kQdblNcWGZT8BMzracb3btNFn
+2n62JuXRKTUTwXxk9PEcYJdeWkUlu309dTvt8ipeygLMxUwP+oiRjTB5lDA4VydH
+qYWBi0iVPgcn5Qcxnxl3nVyTEHPszs/8fsCsqkxkhTmkyBeFAeDMoIxNDVIz92C0
+uzt2zURCBv0EOB0oIig9TTn1M27O421dYGAoIwIDAQABAoIBAHplUVs66/V9+TO7
+/eKSZZWFqvyhiPYG2HDYW7JqHCOyJvKYcIoo4s7qH4EK2ZfAjluPxUMlksMkTdsH
+C5nL57SL03eWLy+0Q9h4c6+qcJsyGY+o0TrqBfPhBH1n0KPFlzHpTDFfTDQdZJ3L
+hLb2dBeu3WvVq0MCMDsVLcfq9Lf4VIimKnufYai97p174IPovYLAhfhJiSrAVb+z
+eBBvpiBalFlZc46HaeC0pZrD4k5e5B/5J45b3KeCmmcWvjhfxaJfNAUKIK+fPtZO
+1txrN6+lEZBl3EpfUFWxfMAH73tPVUabO6Ap5OGVK2ahvBQw0UDqBMRM6o6LdX+9
+WKgcOhECgYEA74Zn61Qz2R+J1WRBlZYBAcm/w669B9Mwc/itLyOG77G8UTfqHgkp
+vjCNGEtG6doztmkaN50OuTp3/2iStLdFm1IDfJmaslZFHwsFSX9YeLPoUy3P93ri
+ePrsnnmdKqX1WBovm53kv4bqI9yY8OApD7s7inaMrX9PRbcWg60Q/KsCgYEA5tpU
+GGZEQ7R7m5/R5r5+4uZh9enoD0MDEkg0jm06N8pzk6CEzlynYwPHohkWF58C2lo2
+noufofVRJc+2MKnfL/JulCEL3dVyWVak3fnpl129BMdKJ9ZDsMlAlWeYqP0b9dwE
+Mye1r5ef8rJ8fLBeb8jZrM3+Tlh9OwV7dV7tkmkCgYBMr2VZ7G1lGDnSvfRZZdsQ
+rXzds3YFqVGb74PS0bcDyo2WGyazUw+wOm8R1hfwCtH/loq0P25VUyjT9rDxdrOs
+VIeVPsBOVFxw4eBhdYnnqwG4j7RDcW5MeVmEKz9sRhHUkR2o3tY7k4Am1xuIEtxS
+kwku/WFwso+4rDNjGOeVXQKBgQDj+ZgywEuJ0SKAfUP8awNDb+AtyeCxsavG0ieU
+v6lOj0+z6kE8yaND1OfA3KVEjnNyzsRBrgDnICwS+x0g0aDm6LLq9feSCsfyEe6e
+h753DMstfOFuldojK5vr73KC7/I8yhobqotx7Hq4Yistt76LBf5w+Ly7Aggp0TAq
+qKRUgQKBgDDzyJ8fvVYh/UzI/efY/nhIwkeD3KErszrnle39rKkFfaRaHM66IYF4
+5+1q3UdKGLdt6k1S4B3aLZxAVt7F6SzTFd3amGDJNi0Om0MJ8LnftUL5uBzTloem
+GB+XehWJz0NJcDLdTXN86f1j2LZUEiFR62nSt0uwq3mwMuGrqaEG
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDA8Dkntpi2PSSJDZGYjjG03qNbnMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEEwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANf/CIDabqtejDWqmhzUgrnMpdDW
+yBWr51/pzpft2Lvf5R70Oh2ODxFNTGvbQzi9lTjdT6xcfrSdQk2OkQ5+T6Y+qTqS
+HiMcdbBpMbyVXjIbJsKIwd40FCROq4K1L7FK9WM72ycMLX4K/jyQ+LsUvFefJEHW
+5TXFhmU/ATM62nG927TRZ9p+tibl0Sk1E8F8ZPTxHGCXXlpFJbt9PXU77fIqXsoC
+zMVMD/qIkY0weZQwOFcnR6mFgYtIlT4HJ+UHMZ8Zd51ckxBz7M7P/H7ArKpMZIU5
+pMgXhQHgzKCMTQ1SM/dgtLs7ds1EQgb9BDgdKCIoPU059TNuzuNtXWBgKCMCAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAJUfkmbPE2mrHu12gmsm6nSU7M1l1KELzMTRH
+eZf2NYaqLOOqlz7McsKgu5LJTRmEXi9ufUC1HQfKJLOaj2LkLmVgKzTrP33GQ4wf
+a7WOLeWs90kbiXV71iBBBXEuusMnMzuvBbcJTohwI5/svwCqEISpnSpVLi66dAej
+BTTT0MD5KZWcznMeD/nOMIu+5j0tNBGdCHwLXxbmyuqzBFmMmAJAm9WILGhAZGKk
+5IXbHrZYMEOMXoY2e/NnaMK9Q81q1YgeXZWBKLTF2g3RSrKFm84jBouPR1j7qmsr
+xST6nM0Ngr28dzYCLxOY7p6xqYVo6rxxMfSz2jkM6pycLgwy/w==
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem b/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem
new file mode 100644
index 000000000..0912e95ea
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAwfjLXc6dHBaYsoWQaqsqwhJY68G7+35NVIU8BAKabHcUN5us
+IWieeg2Po1CPLoBfyfAq+06fj1T8D/3irUD/EB3QvDx1gCc+zY9BobIJKLi3Xeit
+zE7gEDDV1FFw8E2fBAptTMn/1GXyD5xsE9HQvtTffgELNXzl5Zp6pS5V6oN2X9Np
+3lExRFMmrEZmKuZMhA4UjLahA21Hv8GNlI4tqFazj6/timvSJHtqVGruhm/PAw7F
+1lOZ34t6/h/suayqRpDSZF+E7gnaFL8px/tpQwpLfHEixiMXjVrGBJRFnvHH8sdR
+wpU2mSR4j6KoUxS1wbW0iiOim3Fwy7vOd3ezvQIDAQABAoIBACWGAaFmBNKYNHXk
+jKl171GXxwfkdH8UUdVV6ORFtKXi61BOlx/nYzDtSqonPWubfexMv6PZ89gAcrqN
+PLqTZkQx4F1pvLlL3kRZwDKNhGQSR7as+mIZqBK5v8PQ9W4nNenMMpS2Rv1Js2f9
+tJKo9h7Ug1+WyBpSzQ57sdoeepRg/9pA489XjdqPIuyDSydhCEqqVe7slU62q1d+
+AjxTZ5tmD8dnbV0TWOphvBM8bVQc4UA/eMGIrS0jbkDLIuT63ZrPzVtaEqYK56O4
+IsWjaweFeIUJhtSyZ8/OA+7kqndxEZLCBY4XmUt+z2tyqtNg7rPcOwD04LWBPjW2
+KV60DOkCgYEA4+6kg8G098u7EkilpkuAmgnskm8Lw7Ic8IhJa8n7UVLFg4PNuPz9
+dcYcpy2wSu5CzSqNWNe9lePXX7VGVpSLvNxDg0R8L3m8m0Pu5sv17RdmKKJJz9D3
+kYCPgm6qitoTOKK2hOEiqVHzS4RydjaouyZDTFs1U/eHOkmR33JMP4MCgYEA2duV
+HKBwgP2PxVKUwwyqPK4spHVAfGh0cZWzTShvfB8DiLO9RyjmD9g9HMlEqc2FI9O5
+vhEsrWbVHQQ/kJHPLTCF7OKYYR+K9b8rezRvcQuvxCOnOcsatpahl5Fgt2o28SDn
+eu/2dh2NmTrM4jqNx4tvha5EVXUmbr7Qf+dgm78CgYEA0Kk7elLutJqRm19eJiqg
+hGPpavS1tGVuENTzQfYaWIyJvKgAwQT5k8PVn0Y4SaBtDx2RYG/AY2O9WyS8S66Z
+bj/Gnnknpt6vRwSdxDOb43y0TSako9cNjOpAdouRHKQfTI3IwUTJUnBvZgbOMmI/
+fXS9zz0ASOolpbqMDB66prkCgYB7CCHul8DRZ+EQq7FtcbKWMDrv6XOwjoDsQIGQ
+2nwTWaRySCdlj3hVjGX+4r9PMcy1zfVAnIxhpQhHqcWIDIA24gdQHyu09c5ROFQC
+8TraWaI6n3PqFISShwDdCvHWwzoh9NYlPG0wiUIVPfrE7BJzlZA2q5LVvCInOsWe
+5flOGwKBgCOyshMpw8FPCuNE8gEONH7aQ03MphLGzMcDVD7rl3I8d35z/IRhNgas
+V+I+Cfp6Tde7Ad8fNXX7ogoxxX/1UvkWGKg70ogqW9cKBqLsy7Pa+JMH201roJWR
+aODbnz02V1pQ8MBT4u7QG6PNyyue5W6h/2ADZCYvQJM8lrppKFE+
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDYWRAThkKd/J+oMW7tZBqEPx4XOMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEIwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMH4y13OnRwWmLKFkGqrKsISWOvB
+u/t+TVSFPAQCmmx3FDebrCFonnoNj6NQjy6AX8nwKvtOn49U/A/94q1A/xAd0Lw8
+dYAnPs2PQaGyCSi4t13orcxO4BAw1dRRcPBNnwQKbUzJ/9Rl8g+cbBPR0L7U334B
+CzV85eWaeqUuVeqDdl/Tad5RMURTJqxGZirmTIQOFIy2oQNtR7/BjZSOLahWs4+v
+7Ypr0iR7alRq7oZvzwMOxdZTmd+Lev4f7LmsqkaQ0mRfhO4J2hS/Kcf7aUMKS3xx
+IsYjF41axgSURZ7xx/LHUcKVNpkkeI+iqFMUtcG1tIojoptxcMu7znd3s70CAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAPa5w9kshcNgeOdsWJnKrGy31Jmhbi00a0ue0
+PSv1K49wvRIiHjk49DhOjHLRDoyEZ6AHme4dJIZ7G4GL4dKyW8eVi22nCN/2G6+u
+vssUXXNTTnaOXIXVVtnyTeMr4JHcysn0wMsMsApCvkpyB2euC+uvA8ppvfr6Zdng
+3okbQGhTvhkBZM2/jbtPb8O1XzXepPeYlXMiOcRsSA4oy5sYi8BFXuODCtH2qJD4
+zuSGEpWrDbzqUPGmXSoLALzpObI4v2yDLgrYZMMfOXOmtmeD1gfyIptl/pSeAko8
+lXxhgAY2ef2P1j2SCMIwNTPtIIrqJLmCt7EUWXjSpnnEIWP6bA==
+-----END CERTIFICATE-----
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 942f0e13a..2b9b6d581 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -37,6 +37,15 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
+#include <compare>
+
+constexpr std::strong_ordering operator<=>(const std::string& lhs, const std::string& rhs) noexcept {
+  return lhs.compare(rhs) <=> 0;
+}
+#endif
+
 namespace org::apache::nifi::minifi {
 namespace utils {
 
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index ce32146e9..2aab23ba4 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -33,7 +33,7 @@
 #include <memory>
 
 // libc++ doesn't define operator<=> on durations, and apparently the operator rewrite rules don't automagically make one
-#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
 #include <compare>
 #endif
 
@@ -41,7 +41,7 @@
 
 #define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
 
-#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
 template<typename Rep1, typename Period1, typename Rep2, typename Period2>
 std::strong_ordering operator<=>(std::chrono::duration<Rep1, Period1> lhs, std::chrono::duration<Rep2, Period2> rhs) {
   if (lhs < rhs) {
diff --git a/libminifi/src/utils/net/SslServer.cpp b/libminifi/src/utils/net/SslServer.cpp
index 2d1e14807..682486c40 100644
--- a/libminifi/src/utils/net/SslServer.cpp
+++ b/libminifi/src/utils/net/SslServer.cpp
@@ -81,7 +81,6 @@ SslServer::SslServer(std::optional<size_t> max_queue_size, uint16_t port, std::s
     } else if (client_auth == ClientAuthOption::WANT) {
       context_.set_verify_mode(asio::ssl::verify_peer);
     }
-  startAccept();
 }
 
 std::shared_ptr<SslSession> SslServer::createSession() {
diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp
index e3a53db55..742ef7f25 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -58,7 +58,6 @@ void TcpSession::handleReadUntilNewLine(std::error_code error_code) {
 
 TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger)
     : SessionHandlingServer<TcpSession>(max_queue_size, port, std::move(logger)) {
-  startAccept();
 }
 
 std::shared_ptr<TcpSession> TcpServer::createSession() {


[nifi-minifi-cpp] 02/03: MINIFICPP-1927 Fix ExecuteProcess arg escaping

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9627f2ebf9699c0eae6710a15fc713ebcf0d71e4
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Nov 9 13:29:58 2022 +0100

    MINIFICPP-1927 Fix ExecuteProcess arg escaping
    
    Arguments were split at whitespaces with no way of escaping, despite the
    property description stating, that "White space can be escaped by
    enclosing it in double-quotes". This fixes quote escaping for arguments.
    
    Additionally, ExecuteProcess was refactored, and tests were rewritten.
    
    Closes #1414
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  16 +-
 .../processors/ExecuteProcess.cpp                  | 379 +++++++++++----------
 .../processors/ExecuteProcess.h                    |  82 ++---
 .../standard-processors/tests/CMakeLists.txt       |  10 +-
 .../tests/integration/TestExecuteProcess.cpp       | 104 ------
 .../tests/resource_apps/EchoParameters.cpp         |  34 ++
 .../tests/unit/ExecuteProcessTests.cpp             | 175 ++++++++++
 7 files changed, 461 insertions(+), 339 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d052e5c6c..3144e8390 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -551,18 +551,18 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description
 
-Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals.
+Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals. This processor is not available on Windows systems.
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description                                                                                                                                                                 |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Batch Duration        | 0 sec         |                  | If the process is expected to be long-running and produce textual output, a batch duration can be specified.                                                                |
-| Command               |               |                  | Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.<br/>**Supports Expression Language: true**  |
-| Command Arguments     |               |                  | The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.<br/>**Supports Expression Language: true** |
-| Redirect Error Stream | false         |                  | If true will redirect any error stream output of the process to the output stream.                                                                                          |
-| Working Directory     |               |                  | The directory to use as the current working directory when executing the command<br/>**Supports Expression Language: true**                                                 |
+| Name                  | Default Value | Allowable Values | Description                                                                                                                      |
+|-----------------------|---------------|------------------|----------------------------------------------------------------------------------------------------------------------------------|
+| Batch Duration        | 0 sec         |                  | If the process is expected to be long-running and produce textual output, a batch duration can be specified.                     |
+| Command               |               |                  | Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.  |
+| Command Arguments     |               |                  | The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes. |
+| Redirect Error Stream | false         |                  | If true will redirect any error stream output of the process to the output stream.                                               |
+| Working Directory     |               |                  | The directory to use as the current working directory when executing the command                                                 |
 ### Relationships
 
 | Name    | Description                                            |
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 7f43e1e0b..dfa32fedc 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -17,10 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#ifndef WIN32
 #include "ExecuteProcess.h"
 #include <cstring>
 #include <memory>
 #include <string>
+#include <iomanip>
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyBuilder.h"
@@ -29,43 +31,37 @@
 #include "utils/TimeUtil.h"
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#pragma clang diagnostic ignored "-Wunused-result"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#pragma GCC diagnostic ignored "-Wunused-result"
-#endif
+#include "utils/Environment.h"
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
+namespace org::apache::nifi::minifi::processors {
 core::Property ExecuteProcess::Command(
-    core::PropertyBuilder::createProperty("Command")->withDescription("Specifies the command to be executed; if just the name of an executable"
-                                                                      " is provided, it must be in the user's environment PATH.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Command")
+      ->withDescription("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+      ->build());
+
 core::Property ExecuteProcess::CommandArguments(
-    core::PropertyBuilder::createProperty("Command Arguments")->withDescription("The arguments to supply to the executable delimited by white space. White "
-                                                                                "space can be escaped by enclosing it in "
-                                                                                "double-quotes.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Command Arguments")
+      ->withDescription("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
+      ->build());
+
 core::Property ExecuteProcess::WorkingDir(
-    core::PropertyBuilder::createProperty("Working Directory")->withDescription("The directory to use as the current working directory when executing the command")->supportsExpressionLanguage(true)
-        ->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Working Directory")
+      ->withDescription("The directory to use as the current working directory when executing the command")
+      ->build());
 
 core::Property ExecuteProcess::BatchDuration(
-    core::PropertyBuilder::createProperty("Batch Duration")->withDescription("If the process is expected to be long-running and produce textual output, a "
-                                                                             "batch duration can be specified.")->withDefaultValue<core::TimePeriodValue>("0 sec")->build());
+    core::PropertyBuilder::createProperty("Batch Duration")
+      ->withDescription("If the process is expected to be long-running and produce textual output, a batch duration can be specified.")
+      ->withDefaultValue<core::TimePeriodValue>("0 sec")
+      ->build());
 
 core::Property ExecuteProcess::RedirectErrorStream(
-    core::PropertyBuilder::createProperty("Redirect Error Stream")->withDescription("If true will redirect any error stream output of the process to the output stream.")->withDefaultValue<bool>(false)
-        ->build());
+    core::PropertyBuilder::createProperty("Redirect Error Stream")
+      ->withDescription("If true will redirect any error stream output of the process to the output stream.")
+      ->withDefaultValue<bool>(false)
+      ->build());
 
 core::Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship.");
 
@@ -74,172 +70,201 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  if (dup2(pipefd_[1], STDOUT) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, sizeof(buffer));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>(sizeof(buffer) - read_to_buffer)) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      read_to_buffer = 0;
+      buf_ptr = buffer;
+    } else {
+      read_to_buffer += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
+  }
+
+  if (read_to_buffer > 0) {
+    logger_->log_debug("Execute Command Respond %zu", read_to_buffer);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, read_to_buffer);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
   }
+  if (flow_file) {
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!utils::Environment::setCurrentWorkingDirectory(working_dir_.c_str())) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+
+  if (pipe(pipefd_) == -1) {
+    yield();
+    return;
+  }
+  switch (pid_ = fork()) {
+    case -1:
+      executeProcessForkFailed();
+      break;
+    case 0:  // this is the code the child runs
+      executeChildProcess();
+      break;
+    default:  // this is the code the parent runs
+      collectChildProcessOutput(*session);
+      break;
+  }
 }
 
 REGISTER_RESOURCE(ExecuteProcess, Processor);
 
-#endif
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
+}  // namespace org::apache::nifi::minifi::processors
 #endif
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 5556c3181..f61681b8b 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -17,24 +17,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
+#ifndef WIN32
+#pragma once
 
 #include <errno.h>
 #include <signal.h>
 #include <stdio.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 
 #include <chrono>
 #include <iostream>
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
-#ifndef WIN32
-#include <sys/wait.h>
-
-#endif
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Processor.h"
@@ -42,31 +40,26 @@
 #include "FlowFileRecord.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
+namespace org::apache::nifi::minifi::processors {
 
-// ExecuteProcess Class
 class ExecuteProcess : public core::Processor {
  public:
-  ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {}) // NOLINT
-      : Processor(name, uuid) {
-    _redirectErrorStream = false;
-    _workingDir = ".";
-    _processRunning = false;
-    _pid = 0;
+  explicit ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {})
+      : Processor(name, uuid),
+        working_dir_("."),
+        redirect_error_stream_(false),
+        pid_(0) {
   }
   ~ExecuteProcess() override {
-    if (_processRunning && _pid > 0)
-      kill(_pid, SIGTERM);
+    if (pid_ > 0) {
+      kill(pid_, SIGTERM);
+    }
   }
 
   EXTENSIONAPI static constexpr const char* Description = "Runs an operating system command specified by the user and writes the output of that command to a FlowFile. "
       "If the command is expected to be long-running, the Processor can output the partial data on a specified interval. "
-      "When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals.";
+      "When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals. "
+      "This processor is not available on Windows systems.";
 
   EXTENSIONAPI static core::Property Command;
   EXTENSIONAPI static core::Property CommandArguments;
@@ -88,38 +81,33 @@ class ExecuteProcess : public core::Processor {
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
- public:
-  // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
-  // Initialize, over write by NiFi ExecuteProcess
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) override;
   void initialize() override;
 
  private:
-  // Logger
+  std::vector<std::string> readArgs() const;
+  void executeProcessForkFailed();
+  void executeChildProcess();
+  void collectChildProcessOutput(core::ProcessSession& session);
+  void readOutputInBatches(core::ProcessSession& session);
+  void readOutput(core::ProcessSession& session);
+  bool writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const;
+
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteProcess>::getLogger();
-  // Property
-  std::string _command;
-  std::string _commandArgument;
-  std::string _workingDir;
-  std::chrono::milliseconds _batchDuration  = std::chrono::milliseconds(0);
-  bool _redirectErrorStream;
-  // Full command
-  std::string _fullCommand;
-  // whether the process is running
-  bool _processRunning;
-  int _pipefd[2];
-  pid_t _pid;
+  std::string command_;
+  std::string command_argument_;
+  std::string working_dir_;
+  std::chrono::milliseconds batch_duration_  = std::chrono::milliseconds(0);
+  bool redirect_error_stream_;
+  std::string full_command_;
+  int pipefd_[2]{};
+  pid_t pid_;
 };
 
+}  // namespace org::apache::nifi::minifi::processors
 #endif
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index db9acd9b1..01e162de2 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -20,6 +20,7 @@
 
 file(GLOB PROCESSOR_UNIT_TESTS  "unit/*.cpp")
 file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp")
+file(GLOB RESOURCE_APPS "resource_apps/*.cpp")
 if(OPENSSL_OFF)
     list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/integration/SecureSocketGetTCPTest.cpp")
     list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/integration/TLSServerSocketSupportedProtocolsTest.cpp")
@@ -85,9 +86,6 @@ FOREACH(testfile ${PROCESSOR_INTEGRATION_TESTS})
 ENDFOREACH()
 message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...")
 
-
-add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )
-
 if(NOT OPENSSL_OFF)
     add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecure.yml"  "${TEST_RESOURCES}/")
     add_test(NAME SecureSocketGetTCPTestEmptyPass COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecureEmptyPass.yml"  "${TEST_RESOURCES}/")
@@ -101,3 +99,9 @@ endif()
 add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml"  "${TEST_RESOURCES}/")
 
 add_test(NAME TailFileCronTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFileCron.yml"  "${TEST_RESOURCES}/")
+
+FOREACH(resourcefile ${RESOURCE_APPS})
+    get_filename_component(resourcefilename "${resourcefile}" NAME_WE)
+    add_executable("${resourcefilename}" "${resourcefile}")
+    set_target_properties(${resourcefilename} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
+ENDFOREACH()
diff --git a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
deleted file mode 100644
index 9d6ae3da1..000000000
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <type_traits> //NOLINT
-#include <sys/stat.h> //NOLINT
-#include <chrono> //NOLINT
-#include <thread> //NOLINT
-#undef NDEBUG
-#include <cassert>
-#include <string>
-#include <utility>
-#include <memory>
-#include <vector>
-#include <fstream>
-#include "core/repository/VolatileContentRepository.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "FlowController.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/controller/ControllerServiceNode.h"
-#include "core/controller/ControllerServiceProvider.h"
-#include "processors/ExecuteProcess.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "TestBase.h"
-#include "Catch.h"
-
-int main(int /*argc*/, char ** /*argv*/) {
-#ifndef WIN32
-  TestController testController;
-  std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
-  processor->setMaxConcurrentTasks(1);
-
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestThreadedRepository>();
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
-
-  utils::Identifier processoruuid = processor->getUUID();
-  assert(processoruuid);
-  auto connection = std::make_unique<minifi::Connection>(test_repo, content_repo, "executeProcessConnection");
-  connection->addRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor.get());
-  connection->setDestination(processor.get());
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection.get());
-  assert(processor->getName() == "executeProcess");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  processor->initialize();
-
-  std::atomic<bool> is_ready(false);
-
-  std::vector<std::thread> processor_workers;
-
-  auto node2 = std::make_shared<core::ProcessorNode>(processor.get());
-  auto contextset = std::make_shared<core::ProcessContext>(node2, nullptr, test_repo, test_repo);
-  core::ProcessSessionFactory factory(contextset);
-  processor->onSchedule(contextset.get(), &factory);
-
-  processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() {
-    auto node = std::make_shared<core::ProcessorNode>(processor.get());
-    auto context = std::make_shared<core::ProcessContext>(node, nullptr, test_repo, test_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5");
-    auto session = std::make_shared<core::ProcessSession>(context);
-    while (!is_ready.load(std::memory_order_relaxed)) {
-    }
-    processor->onTrigger(context.get(), session.get());
-  }));
-
-  is_ready.store(true, std::memory_order_relaxed);
-
-  std::for_each(processor_workers.begin(), processor_workers.end(), [](std::thread &t) {
-    t.join();
-  });
-
-  auto execp = std::static_pointer_cast<org::apache::nifi::minifi::processors::ExecuteProcess>(processor);
-#endif
-}
diff --git a/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp
new file mode 100644
index 000000000..86468e32e
--- /dev/null
+++ b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <chrono>
+#include <thread>
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    std::cerr << "Usage: ./EchoParameters <delay between parameters milliseconds> <text to write>" << std::endl;
+    return 1;
+  }
+
+  std::cout << argv[2] << std::endl;
+  for (int i = 3; i < argc; ++i) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(std::stoi(argv[1])));
+    std::cout << argv[i] << std::endl;
+  }
+  return 0;
+}
diff --git a/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp
new file mode 100644
index 000000000..a40a47f66
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ExecuteProcess.h"
+#include "SingleProcessorTestController.h"
+#include "utils/file/FileUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+#ifndef WIN32
+
+class ExecuteProcessTestsFixture {
+ public:
+  ExecuteProcessTestsFixture()
+      : execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")),
+        controller_(execute_process_) {
+    LogTestController::getInstance().setTrace<processors::ExecuteProcess>();
+  }
+ protected:
+  std::shared_ptr<processors::ExecuteProcess> execute_process_;
+  test::SingleProcessorTestController controller_;
+};
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single command", "[ExecuteProcess]") {
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, "echo -n test"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test");
+  CHECK(success_flow_files[0]->getAttribute("command") == "echo -n test");
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == "");
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an executable with a parameter", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  std::string arguments = "0 test_data";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an executable with escaped parameters", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  std::string arguments = R"(0 test_data test_data2 "test data 3" "\"test data 4\")";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\ntest_data2\ntest data 3\n\"test data 4\"\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess does not produce a flowfile if no output is generated", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.empty());
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can redirect error stream to stdout", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::RedirectErrorStream, "true"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "Usage: ./EchoParameters <delay between parameters milliseconds> <text to write>\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == "");
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can change workdir", "[ExecuteProcess]") {
+  auto command = "./EchoParameters";
+  std::string arguments = "0 test_data";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::WorkingDir, minifi::utils::file::get_executable_dir()));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can forward long running output in batches", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  std::string arguments = "100 test_data1 test_data2";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments));
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::BatchDuration, "10 ms"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 2);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data1\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+  CHECK(controller_.plan->getContent(success_flow_files[1]) == "test_data2\n");
+  CHECK(success_flow_files[1]->getAttribute("command") == command);
+  CHECK(success_flow_files[1]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess buffer long outputs", "[ExecuteProcess]") {
+  auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command));
+  std::string param1;
+
+  SECTION("Exact buffer size output") {
+    param1.assign(4095, 'a');  // buffer size is 4096, so 4095 'a' characters plus '\n' character should be exactly the buffer size
+  }
+  SECTION("Larger than buffer size output") {
+    param1.assign(8200, 'a');
+  }
+
+  std::string arguments = "0 " + param1;
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == param1.append("\n"));
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+#endif
+}  // namespace org::apache::nifi::minifi::test