You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/11/28 13:34:23 UTC

[nifi-minifi-cpp] branch main updated (3b8fb0d36 -> 20afc8447)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 3b8fb0d36 MINIFICPP-1815 - PersistenceTests transiently fails
     new 8829131ec MINIFICPP-1925 Ensure compatibility with the MiNiFi C2 Service
     new 20afc8447 MINIFICPP-1922 Implement ListenUDP processor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 PROCESSORS.md                                      |  41 +++++++-
 README.md                                          |   6 +-
 conf/minifi.properties                             |   1 +
 docker/test/integration/features/CMakeLists.txt    |   1 +
 .../integration/features/minifi_c2_server.feature  |  20 ++++
 .../minifi/core/DockerTestDirectoryBindings.py     |   1 +
 .../test/integration/minifi/core/FlowContainer.py  |   4 +
 docker/test/integration/minifi/core/ImageStore.py  |  39 +++++++
 ...rverContainer.py => MinifiC2ServerContainer.py} |  12 +--
 .../integration/minifi/core/MinifiContainer.py     |   2 +-
 ...RepoContainer.py => MinifiWithHttpsC2Config.py} |   4 +-
 .../minifi/core/SingleNodeDockerCluster.py         |   8 ++
 .../Minifi_flow_yaml_serializer.py                 |  34 +++++-
 .../resources/minifi-c2-server-ssl/Dockerfile      |   7 ++
 .../minifi-c2-server-ssl/authorities.yaml          |   2 +
 .../minifi-c2-server-ssl/authorizations.yaml       |  46 +++++++++
 .../resources/minifi-c2-server-ssl/c2.properties   |  10 ++
 .../certs/minifi-c2-server-keystore.p12            | Bin 0 -> 2544 bytes
 .../certs/minifi-c2-server-truststore.p12          | Bin 0 -> 2978 bytes
 .../certs/minifi-c2-server.crt                     |  20 ++++
 .../certs/minifi-c2-server.key                     |  28 +++++
 .../minifi-c2-server-ssl/certs/minifi-cpp-flow.crt |  20 ++++
 .../minifi-c2-server-ssl/certs/minifi-cpp-flow.key |  28 +++++
 .../minifi-c2-server-ssl/certs/root-ca.key         |  30 ++++++
 .../minifi-c2-server-ssl/certs/root-ca.pem         |  19 ++++
 .../resources/minifi-c2-server-ssl/config.yml      |  43 ++++++++
 .../resources/minifi-c2-server/Dockerfile          |   2 +
 .../resources/minifi-c2-server/config.yml          |  31 ++++++
 docker/test/integration/steps/steps.py             |  29 ++++++
 extensions/http-curl/protocols/RESTSender.cpp      |  25 +++--
 extensions/http-curl/tests/HTTPHandlers.h          |   2 +-
 .../processors/ListenSyslog.cpp                    |  10 +-
 .../standard-processors/processors/ListenSyslog.h  |   2 -
 .../standard-processors/processors/ListenTCP.cpp   |  10 +-
 .../standard-processors/processors/ListenTCP.h     |   2 -
 .../processors/{ListenTCP.cpp => ListenUDP.cpp}    |  50 +++------
 .../processors/{ListenTCP.h => ListenUDP.h}        |  17 +--
 .../processors/NetworkListenerProcessor.cpp        |   9 +-
 .../processors/NetworkListenerProcessor.h          |  11 +-
 .../tests/unit/ListenSyslogTests.cpp               |  10 +-
 .../tests/unit/ListenTcpTests.cpp                  |   8 +-
 .../tests/unit/ListenUDPTests.cpp                  | 115 +++++++++++++++++++++
 .../standard-processors/tests/unit/PutTCPTests.cpp |  11 +-
 libminifi/include/FlowController.h                 |   4 +-
 libminifi/include/c2/C2Agent.h                     |   2 +-
 libminifi/include/core/FlowConfiguration.h         |   6 +-
 libminifi/include/core/state/UpdateController.h    |   2 +-
 .../include/core/state/nodes/FlowInformation.h     |   2 +-
 libminifi/src/FlowController.cpp                   |   8 +-
 libminifi/src/c2/C2Agent.cpp                       |  15 ++-
 libminifi/src/c2/protocols/RESTProtocol.cpp        |   8 +-
 libminifi/src/core/FlowConfiguration.cpp           |  12 +--
 .../src/core/state/nodes/SupportedOperations.cpp   |   2 +-
 libminifi/test/unit/ControllerTests.cpp            |   2 +-
 54 files changed, 684 insertions(+), 149 deletions(-)
 create mode 100644 docker/test/integration/features/minifi_c2_server.feature
 copy docker/test/integration/minifi/core/{PostgreSQLServerContainer.py => MinifiC2ServerContainer.py} (80%)
 copy docker/test/integration/minifi/core/{MinifiWithProvenanceRepoContainer.py => MinifiWithHttpsC2Config.py} (89%)
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/Dockerfile
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/authorities.yaml
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/authorizations.yaml
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/c2.properties
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-keystore.p12
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-truststore.p12
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.crt
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.key
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.crt
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.key
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.key
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.pem
 create mode 100644 docker/test/integration/resources/minifi-c2-server-ssl/config.yml
 create mode 100644 docker/test/integration/resources/minifi-c2-server/Dockerfile
 create mode 100644 docker/test/integration/resources/minifi-c2-server/config.yml
 copy extensions/standard-processors/processors/{ListenTCP.cpp => ListenUDP.cpp} (58%)
 copy extensions/standard-processors/processors/{ListenTCP.h => ListenUDP.h} (71%)
 create mode 100644 extensions/standard-processors/tests/unit/ListenUDPTests.cpp


[nifi-minifi-cpp] 01/02: MINIFICPP-1925 Ensure compatibility with the MiNiFi C2 Service

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8829131ecbdfa54ef28dc205ae468d2f5445a15d
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Nov 28 11:41:12 2022 +0100

    MINIFICPP-1925 Ensure compatibility with the MiNiFi C2 Service
    
    Ensure that config update and heartbeats from MiNiFi C2 server are handled properly in MiNiFi agent
    
    * Read flowid from configuration update parameter
    * Make C2 operation strings case-insensitive
    * Handle null in requested operations
    * Remove Accept header temporarily until MiNiFi C2 handling is fixed
    * Add test coverage for MiNiFi C2 compatibility
    
    Closes #1420
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 conf/minifi.properties                             |   1 +
 docker/test/integration/features/CMakeLists.txt    |   1 +
 .../integration/features/minifi_c2_server.feature  |  20 +++++++++
 .../minifi/core/DockerTestDirectoryBindings.py     |   1 +
 .../test/integration/minifi/core/FlowContainer.py  |   4 ++
 docker/test/integration/minifi/core/ImageStore.py  |  39 +++++++++++++++++
 ...FlowContainer.py => MinifiC2ServerContainer.py} |  23 +++++++----
 .../integration/minifi/core/MinifiContainer.py     |   2 +-
 ...FlowContainer.py => MinifiWithHttpsC2Config.py} |  17 ++------
 .../minifi/core/SingleNodeDockerCluster.py         |   8 ++++
 .../Minifi_flow_yaml_serializer.py                 |  34 ++++++++++++++-
 .../resources/minifi-c2-server-ssl/Dockerfile      |   7 ++++
 .../minifi-c2-server-ssl/authorities.yaml          |   2 +
 .../minifi-c2-server-ssl/authorizations.yaml       |  46 +++++++++++++++++++++
 .../resources/minifi-c2-server-ssl/c2.properties   |  10 +++++
 .../certs/minifi-c2-server-keystore.p12            | Bin 0 -> 2544 bytes
 .../certs/minifi-c2-server-truststore.p12          | Bin 0 -> 2978 bytes
 .../certs/minifi-c2-server.crt                     |  20 +++++++++
 .../certs/minifi-c2-server.key                     |  28 +++++++++++++
 .../minifi-c2-server-ssl/certs/minifi-cpp-flow.crt |  20 +++++++++
 .../minifi-c2-server-ssl/certs/minifi-cpp-flow.key |  28 +++++++++++++
 .../minifi-c2-server-ssl/certs/root-ca.key         |  30 ++++++++++++++
 .../minifi-c2-server-ssl/certs/root-ca.pem         |  19 +++++++++
 .../resources/minifi-c2-server-ssl/config.yml      |  43 +++++++++++++++++++
 .../resources/minifi-c2-server/Dockerfile          |   2 +
 .../resources/minifi-c2-server/config.yml          |  31 ++++++++++++++
 docker/test/integration/steps/steps.py             |  29 +++++++++++++
 extensions/http-curl/protocols/RESTSender.cpp      |  25 +++++++----
 extensions/http-curl/tests/HTTPHandlers.h          |   2 +-
 libminifi/include/FlowController.h                 |   4 +-
 libminifi/include/c2/C2Agent.h                     |   2 +-
 libminifi/include/core/FlowConfiguration.h         |   6 +--
 libminifi/include/core/state/UpdateController.h    |   2 +-
 .../include/core/state/nodes/FlowInformation.h     |   2 +-
 libminifi/src/FlowController.cpp                   |   8 ++--
 libminifi/src/c2/C2Agent.cpp                       |  15 ++++---
 libminifi/src/c2/protocols/RESTProtocol.cpp        |   8 +++-
 libminifi/src/core/FlowConfiguration.cpp           |  12 +++---
 .../src/core/state/nodes/SupportedOperations.cpp   |   2 +-
 libminifi/test/unit/ControllerTests.cpp            |   2 +-
 40 files changed, 495 insertions(+), 60 deletions(-)

diff --git a/conf/minifi.properties b/conf/minifi.properties
index 135c91b21..8dc746e6c 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -79,6 +79,7 @@ nifi.content.repository.class.name=DatabaseContentRepository
 #nifi.c2.flow.base.url=
 #nifi.c2.rest.url=
 #nifi.c2.rest.url.ack=
+#nifi.c2.rest.ssl.context.service=
 nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
 nifi.c2.full.heartbeat=false
diff --git a/docker/test/integration/features/CMakeLists.txt b/docker/test/integration/features/CMakeLists.txt
index 901a3d71e..606376295 100644
--- a/docker/test/integration/features/CMakeLists.txt
+++ b/docker/test/integration/features/CMakeLists.txt
@@ -25,6 +25,7 @@ set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/ro
 set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/s2s.feature")
 set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/syslog_listener.feature")
 set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/network_listener.feature")
+set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/minifi_c2_server.feature")
 
 if (ENABLE_AZURE)
     set(ENABLED_BEHAVE_TESTS "${ENABLED_BEHAVE_TESTS};${CMAKE_CURRENT_SOURCE_DIR}/azure_storage.feature")
diff --git a/docker/test/integration/features/minifi_c2_server.feature b/docker/test/integration/features/minifi_c2_server.feature
new file mode 100644
index 000000000..f9bc3f5dd
--- /dev/null
+++ b/docker/test/integration/features/minifi_c2_server.feature
@@ -0,0 +1,20 @@
+Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 server
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: MiNiFi flow config is updated from MiNiFi C2 server
+    Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/non-existent"
+    And a file with the content "test" is present in "/tmp/input"
+    And a MiNiFi C2 server is set up
+    When all instances start up
+    Then the MiNiFi C2 server logs contain the following message: "acknowledged with a state of FULLY_APPLIED(DONE)" in less than 30 seconds
+    And a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
+
+  Scenario: MiNiFi flow config is updated from MiNiFi C2 server through SSL
+    Given a file with the content "test" is present in "/tmp/input"
+    And a ssl context service is set up for MiNiFi C2 server
+    And a MiNiFi C2 server is set up with SSL
+    When all instances start up
+    Then the MiNiFi C2 SSL server logs contain the following message: "acknowledged with a state of FULLY_APPLIED(DONE)" in less than 60 seconds
+    And a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index ee79f4e23..be39110d2 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -56,6 +56,7 @@ class DockerTestDirectoryBindings:
         shutil.copytree(test_dir + "/resources/lua", self.data_directories[self.test_id]["resources_dir"] + "/lua")
         shutil.copytree(test_dir + "/resources/elasticsearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/elasticsearch")
         shutil.copytree(test_dir + "/resources/opensearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/opensearch")
+        shutil.copytree(test_dir + "/resources/minifi-c2-server-ssl/certs", self.data_directories[self.test_id]["resources_dir"] + "/minifi-c2-server-ssl")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/FlowContainer.py b/docker/test/integration/minifi/core/FlowContainer.py
index 748991c6c..0ce1655c5 100644
--- a/docker/test/integration/minifi/core/FlowContainer.py
+++ b/docker/test/integration/minifi/core/FlowContainer.py
@@ -22,9 +22,13 @@ class FlowContainer(Container):
         super().__init__(name, engine, vols, network, image_store, command)
         self.start_nodes = []
         self.config_dir = config_dir
+        self.controllers = []
 
     def get_start_nodes(self):
         return self.start_nodes
 
     def add_start_node(self, node):
         self.start_nodes.append(node)
+
+    def add_controller(self, controller):
+        self.controllers.append(controller)
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 4076f3031..2700c89d0 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -46,6 +46,8 @@ class ImageStore:
             image = self.__build_minifi_cpp_image()
         elif container_engine == "minifi-cpp-with-provenance-repo":
             image = self.__build_minifi_cpp_image_with_provenance_repo()
+        elif container_engine == "minifi-cpp-with-https-c2-config":
+            image = self.__build_minifi_cpp_image_with_https_c2_config()
         elif container_engine == "http-proxy":
             image = self.__build_http_proxy_image()
         elif container_engine == "postgresql-server":
@@ -64,6 +66,10 @@ class ImageStore:
             image = self.__build_elasticsearch_image()
         elif container_engine == "opensearch":
             image = self.__build_opensearch_image()
+        elif container_engine == "minifi-c2-server":
+            image = self.__build_minifi_c2_image()
+        elif container_engine == "minifi-c2-server-ssl":
+            image = self.__build_minifi_c2_ssl_image()
         else:
             raise Exception("There is no associated image for " + container_engine)
 
@@ -106,6 +112,14 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.flow.base.url=http://minifi-c2-server:10090/c2/config/  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.full.heartbeat=false  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.agent.class=minifi-test-class  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.agent.identifier=minifi-test-id  >> {minifi_root}/conf/minifi.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
@@ -129,6 +143,25 @@ class ImageStore:
             image = self.__build_image(dockerfile, [properties_context])
         return image
 
+    def __build_minifi_cpp_image_with_https_c2_config(self):
+        dockerfile = dedent("""\
+                FROM {base_image}
+                USER root
+                RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
+                RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.rest.url=https://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.rest.url.ack=https://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.rest.ssl.context.service=SSLContextService  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.flow.base.url=https://minifi-c2-server:10090/c2/config/  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.full.heartbeat=false  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.agent.class=minifi-test-class  >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.c2.agent.identifier=minifi-test-id  >> {minifi_root}/conf/minifi.properties
+                USER minificpp
+                """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
+                           minifi_root=MinifiContainer.MINIFI_ROOT))
+
+        return self.__build_image(dockerfile)
+
     def __build_http_proxy_image(self):
         dockerfile = dedent("""\
                 FROM {base_image}
@@ -195,6 +228,12 @@ class ImageStore:
     def __build_opensearch_image(self):
         return self.__build_image_by_path(self.test_dir + "/resources/opensearch", 'opensearch')
 
+    def __build_minifi_c2_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/minifi-c2-server", 'minifi-c2-server')
+
+    def __build_minifi_c2_ssl_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/minifi-c2-server-ssl", 'minifi-c2-server')
+
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/FlowContainer.py b/docker/test/integration/minifi/core/MinifiC2ServerContainer.py
similarity index 59%
copy from docker/test/integration/minifi/core/FlowContainer.py
copy to docker/test/integration/minifi/core/MinifiC2ServerContainer.py
index 748991c6c..88d0991e8 100644
--- a/docker/test/integration/minifi/core/FlowContainer.py
+++ b/docker/test/integration/minifi/core/MinifiC2ServerContainer.py
@@ -17,14 +17,21 @@
 from .Container import Container
 
 
-class FlowContainer(Container):
-    def __init__(self, config_dir, name, engine, vols, network, image_store, command):
+class MinifiC2ServerContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None, ssl=False):
+        engine = "minifi-c2-server-ssl" if ssl else "minifi-c2-server"
         super().__init__(name, engine, vols, network, image_store, command)
-        self.start_nodes = []
-        self.config_dir = config_dir
 
-    def get_start_nodes(self):
-        return self.start_nodes
+    def get_startup_finished_log_entry(self):
+        return "Server Started"
 
-    def add_start_node(self, node):
-        self.start_nodes.append(node)
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        self.docker_container = self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            entrypoint=self.command)
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index 31d728005..8bd376ca5 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -34,7 +34,7 @@ class MinifiContainer(FlowContainer):
 
     def _create_config(self):
         serializer = Minifi_flow_yaml_serializer()
-        test_flow_yaml = serializer.serialize(self.start_nodes)
+        test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
         with open(os.path.join(self.config_dir, "config.yml"), 'wb') as config_file:
             config_file.write(test_flow_yaml.encode('utf-8'))
diff --git a/docker/test/integration/minifi/core/FlowContainer.py b/docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py
similarity index 64%
copy from docker/test/integration/minifi/core/FlowContainer.py
copy to docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py
index 748991c6c..79a4d44b8 100644
--- a/docker/test/integration/minifi/core/FlowContainer.py
+++ b/docker/test/integration/minifi/core/MinifiWithHttpsC2Config.py
@@ -13,18 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from .MinifiContainer import MinifiContainer
 
-from .Container import Container
 
-
-class FlowContainer(Container):
-    def __init__(self, config_dir, name, engine, vols, network, image_store, command):
-        super().__init__(name, engine, vols, network, image_store, command)
-        self.start_nodes = []
-        self.config_dir = config_dir
-
-    def get_start_nodes(self):
-        return self.start_nodes
-
-    def add_start_node(self, node):
-        self.start_nodes.append(node)
+class MinifiWithHttpsC2Config(MinifiContainer):
+    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+        super().__init__(config_dir, name, vols, network, image_store, command, engine='minifi-cpp-with-https-c2-config')
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 5ee368cfb..aca62ee05 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -40,6 +40,8 @@ from .SyslogTcpClientContainer import SyslogTcpClientContainer
 from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
 from .TcpClientContainer import TcpClientContainer
 from .PrometheusContainer import PrometheusContainer
+from .MinifiC2ServerContainer import MinifiC2ServerContainer
+from .MinifiWithHttpsC2Config import MinifiWithHttpsC2Config
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -101,6 +103,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp-with-provenance-repo':
             return self.containers.setdefault(name, MinifiWithProvenanceRepoContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+        elif engine == 'minifi-cpp-with-https-c2-config':
+            return self.containers.setdefault(name, MinifiWithHttpsC2Config(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kafka-broker':
             if 'zookeeper' not in self.containers:
                 self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
@@ -133,6 +137,10 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, TcpClientContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == "prometheus":
             return self.containers.setdefault(name, PrometheusContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "minifi-c2-server":
+            return self.containers.setdefault(name, MinifiC2ServerContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "minifi-c2-server-ssl":
+            return self.containers.setdefault(name, MinifiC2ServerContainer(name, self.vols, self.network, self.image_store, command, ssl=True))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
index d0606bc79..0af92ab7d 100644
--- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
+++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -23,13 +23,16 @@ from ..core.Funnel import Funnel
 
 
 class Minifi_flow_yaml_serializer:
-    def serialize(self, start_nodes):
+    def serialize(self, start_nodes, controllers):
         res = None
         visited = None
 
         for node in start_nodes:
             res, visited = self.serialize_node(node, res, visited)
 
+        for controller in controllers:
+            res = self.serialize_controller(controller, res)
+
         return yaml.dump(res, default_flow_style=False)
 
     def serialize_node(self, connectable, root=None, visited=None):
@@ -145,3 +148,32 @@ class Minifi_flow_yaml_serializer:
                     self.serialize_node(conn_procs, res, visited)
 
         return (res, visited)
+
+    def serialize_controller(self, controller, root=None):
+        if root is None:
+            res = {
+                'Flow Controller': {
+                    'name': 'MiNiFi Flow'
+                },
+                'Processors': [],
+                'Funnels': [],
+                'Connections': [],
+                'Remote Processing Groups': [],
+                'Controller Services': []
+            }
+        else:
+            res = root
+
+        if hasattr(controller, 'name'):
+            connectable_name = controller.name
+        else:
+            connectable_name = str(controller.uuid)
+
+        res['Controller Services'].append({
+            'name': connectable_name,
+            'id': controller.id,
+            'class': controller.service_class,
+            'Properties': controller.properties
+        })
+
+        return res
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/Dockerfile b/docker/test/integration/resources/minifi-c2-server-ssl/Dockerfile
new file mode 100644
index 000000000..e87447a4a
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/Dockerfile
@@ -0,0 +1,7 @@
+FROM apache/nifi-minifi-c2:1.18.0
+RUN mkdir $MINIFI_C2_HOME/certs/
+COPY --chown=c2:c2 certs/* $MINIFI_C2_HOME/certs/
+COPY --chown=c2:c2 config.yml $MINIFI_C2_HOME/files/minifi-test-class/config.text.yml.v1
+COPY --chown=c2:c2 authorities.yaml $MINIFI_C2_HOME/conf/
+COPY --chown=c2:c2 authorizations.yaml $MINIFI_C2_HOME/conf/
+COPY --chown=c2:c2 c2.properties $MINIFI_C2_HOME/conf/
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/authorities.yaml b/docker/test/integration/resources/minifi-c2-server-ssl/authorities.yaml
new file mode 100644
index 000000000..8043a37e6
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/authorities.yaml
@@ -0,0 +1,2 @@
+CN=minifi-cpp-flow:
+  - CLASS_MINIFI_CPP
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/authorizations.yaml b/docker/test/integration/resources/minifi-c2-server-ssl/authorizations.yaml
new file mode 100644
index 000000000..a4fdeeb1d
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/authorizations.yaml
@@ -0,0 +1,46 @@
+Default Action: deny
+Paths:
+  /c2/config:
+    Default Action: deny
+    Actions:
+    - Authorization: CLASS_MINIFI_CPP
+      Action: allow
+    - Authorization: ROLE_SUPERUSER
+      Action: allow
+
+    # Default authorization lets anonymous pull any config.  Remove below to change that.
+    - Authorization: ROLE_ANONYMOUS
+      Action: allow
+
+  /c2/config/contentTypes:
+    Default Action: deny
+    Actions:
+    - Authorization: CLASS_MINIFI_CPP
+      Action: allow
+    # Default authorization lets anonymous pull any config.  Remove below to change that.
+    - Authorization: ROLE_ANONYMOUS
+      Action: allow
+
+  /c2/config/heartbeat:
+    Default Action: deny
+    Actions:
+      - Authorization: CLASS_MINIFI_CPP
+        Action: allow
+      - Authorization: ROLE_SUPERUSER
+        Action: allow
+
+      # Default authorization lets anonymous pull any config.  Remove below to change that.
+      - Authorization: ROLE_ANONYMOUS
+        Action: allow
+
+  /c2/config/acknowledge:
+    Default Action: deny
+    Actions:
+      - Authorization: CLASS_MINIFI_CPP
+        Action: allow
+      - Authorization: ROLE_SUPERUSER
+        Action: allow
+
+      # Default authorization lets anonymous pull any config.  Remove below to change that.
+      - Authorization: ROLE_ANONYMOUS
+        Action: allow
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/c2.properties b/docker/test/integration/resources/minifi-c2-server-ssl/c2.properties
new file mode 100644
index 000000000..3cafab269
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/c2.properties
@@ -0,0 +1,10 @@
+minifi.c2.server.port=10090
+
+minifi.c2.server.secure=true
+minifi.c2.server.keystore=/opt/minifi-c2/minifi-c2-1.18.0/certs/minifi-c2-server-keystore.p12
+minifi.c2.server.keystoreType=PKCS12
+minifi.c2.server.keystorePasswd=abcdefgh
+minifi.c2.server.keyPasswd=abcdefgh
+minifi.c2.server.truststore=/opt/minifi-c2/minifi-c2-1.18.0/certs/minifi-c2-server-truststore.p12
+minifi.c2.server.truststoreType=PKCS12
+minifi.c2.server.truststorePasswd=abcdefgh
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-keystore.p12 b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-keystore.p12
new file mode 100644
index 000000000..7d280f90f
Binary files /dev/null and b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-keystore.p12 differ
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-truststore.p12 b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-truststore.p12
new file mode 100644
index 000000000..d72e6ebd4
Binary files /dev/null and b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server-truststore.p12 differ
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.crt b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.crt
new file mode 100644
index 000000000..0fcfff9f0
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDMTCCAhmgAwIBAgIUasTOqNQ87C9VtUiy+N8uLOQTcRUwDQYJKoZIhvcNAQEL
+BQAwDzENMAsGA1UEAwwEbXlDQTAgFw0yMjEwMTQxNTAyNTBaGA8yMDUwMDIyODE1
+MDI1MFowGzEZMBcGA1UEAwwQbWluaWZpLWMyLXNlcnZlcjCCASIwDQYJKoZIhvcN
+AQEBBQADggEPADCCAQoCggEBAKsIVaHbxd9HGI/5Dc9Q75vv7/dDLrlEU7S9/k1S
+lJkBBnF32rBNlwNfZdCgSsdtFzsq++88CQjzbbCdi72hwKA9V60oW8HDI8utDdE6
+QY4qTBhBmrxbwGIE/6TNkSpfz940a18Nhzv0mDTvpjLsYqbDAA6vr1/Od/BSqnXq
+/DLtIEEzX1d1gtRqRonyt44Hhmj58gORka8Kggyh84i9k/Ol9IZdCZoxvgDQOwOK
+XaohByQBPChMKpMDQkpJfkIKRGE1K2Ya7BfGqO0Q+lX6N/zSFE9YIZxrhcT6wjcC
+yjOJGou9vVgnu6Idw6iX6Y0pwOrnWPqpQ/sqIxFKWu2UAy0CAwEAAaN3MHUwHwYD
+VR0jBBgwFoAUbor1/5iMMRzmWD4nVDkQ0IkzMawwCQYDVR0TBAIwADALBgNVHQ8E
+BAMCBPAwGwYDVR0RBBQwEoIQbWluaWZpLWMyLXNlcnZlcjAdBgNVHQ4EFgQUqmzn
+O9XoHVK6afH0IDN3oV4Tv2cwDQYJKoZIhvcNAQELBQADggEBAKdtUTXWcUcSr9U7
+CFaU5DFtJbUIvEkbAs2R8+bK/Y280uOEQ4+9plyCcVPF5/ZvHaRDu0orKwaUWEdZ
+eAjYj2isZACXZdWey6b5kE2s8XAGYrM3W8R6gZzSf0rEdjPyA3TwNb3/RgltBKWV
+nzqNiCjo6Cb4dY2MXKOrmlVtUDKEKDa0zl2FYsrJDJ8KS9chROX01r+kcWXiemXP
+bWWfCdhBCJA6gAQfOHz4Cc/fCLwsMi9xO8ddRNBl/jxJ8Sl9UlImSWlp5BSprSO2
+hu+ucHwoVZbC9As1bkEEwODBUCO5L2XKl5O8MuA/NxgT57L7G1UCxBM9sTWb4bEI
+xd6prvQ=
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.key b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.key
new file mode 100644
index 000000000..fc18ef48e
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-c2-server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCrCFWh28XfRxiP
++Q3PUO+b7+/3Qy65RFO0vf5NUpSZAQZxd9qwTZcDX2XQoErHbRc7KvvvPAkI822w
+nYu9ocCgPVetKFvBwyPLrQ3ROkGOKkwYQZq8W8BiBP+kzZEqX8/eNGtfDYc79Jg0
+76Yy7GKmwwAOr69fznfwUqp16vwy7SBBM19XdYLUakaJ8reOB4Zo+fIDkZGvCoIM
+ofOIvZPzpfSGXQmaMb4A0DsDil2qIQckATwoTCqTA0JKSX5CCkRhNStmGuwXxqjt
+EPpV+jf80hRPWCGca4XE+sI3AsoziRqLvb1YJ7uiHcOol+mNKcDq51j6qUP7KiMR
+SlrtlAMtAgMBAAECggEAReBUWBo52BSseNnopfxrwLqBQHTeyIudZVlAZifolTBh
+iQdOPjydB6A4sUlj8+FineZcYuwUxubpuEBNwO6ui+k0AodcIahP3h14aTSTZvlp
++HkJNo6H5aQkLBleh0D45NBm08FrsHeonewRa3m/fmFqCxYFIS/yOaoUgbO9UTJ7
+7nQZfurke8a3dxk6zQqrRkAp7MdXiVRYkOv3NOBk6efM8DjvYJSCpC1I0h2XRxJM
+9ZvCvGbmDHpGWnFkSU3Sc19H7latDp+Tef6Oi31tXVDTjmn1Dgyp3fXw6y1qrj99
+7V+lBQO0TEXW3wrqhMfTvJG9VHASEBaeP1cmXiLj4wKBgQDeiwRaw6/APc9bE+rz
+wer9MQ4UzfmZ3nZH0uYggZJhikCQqFZLyuGT041TX3YrNkmqvDD2ue29GA0H0LBa
+GSQkmw0CtBzT/hxjBO5pib1GjdsTRaB82VIXOmSrjgMR2gND/AguOnK4R2EgakXB
+Ah25+7HZ91aHfbeyesrjUneiSwKBgQDEvtdA2+p64Ryo8nR+RaVpqcdjJRa9Iywz
+QMUwJg8uM7bBap3xThtt1otB8MhetqhL916bZHBBK9AdfSIoTq7Q8lrlbE+lQDub
+ytlz0qOv0GsIVKf7pvgGy0IyZTo8ExNrk0ZXi83tEDBH90VCkPZAdYMkTmgRG6Lw
+frwuperFZwKBgG/X/BtFp8l9Bv5mFznkpp4TDlmkXyJWrKlSM/f4RsIgwmwxPhWf
+ZBlwQ+G342K6SPG23QDS1smnEb1ww4C0i/aduj82mBpu5oNZUhzWbbrMxmJ8Jrk4
+W0pzPW7+00oggG2ld9ML6uX0cbrhzia/UoNLHMpHxUQZCb54egkfRCLbAoGAXq9J
+gJlVu1VjKZulnK9/794ZawmKa/PlbbUaMRXf8GhK58KbyGnCoZXC5zUt+QcG76hZ
+C4fGzlZ7jfWO3r8fOseoHwmFOw4yocN561fQFujC2fuD7IRqkTp43TACWq8DhZ4X
+GELcE97anYfO+T4yhMsJFgv14WXfgMY9YmXPGrkCgYA8BVpuN4rAUsJ+4ouePWRX
+Z0FVKzUcenMy+0whYmEjkUtd72ZwCMh1sNp7rgH+3w8U79RJ52iiOLRtcAl9cNEK
++w8GUdSRr4JQcvhh024eakQ713bZaNZWBZLtgvzDe4HOAC+eRo7v7coHHCBpJzG1
+ihO8WGGFhArEq0YuGxdKaA==
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.crt b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.crt
new file mode 100644
index 000000000..7b6ac1056
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDLzCCAhegAwIBAgIUI5l5u5iR4/JsSBS/Vt4UlvwJ0lswDQYJKoZIhvcNAQEL
+BQAwDzENMAsGA1UEAwwEbXlDQTAgFw0yMjEwMTQxNTAyMDhaGA8yMDUwMDIyODE1
+MDIwOFowGjEYMBYGA1UEAwwPbWluaWZpLWNwcC1mbG93MIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAqFEBLXiOkjDQ8qt6VOHKrCfQmh0a9FL5VNlfYUvV
+IyuVFpL1+6wc9s3/7yd+M8MwkhbzXHQbh/Hh1VHIrmmcNWVesmCs8vU7f9vueBtX
+mqKEzTN+/apNRSABznkkJoHRJFwQjXq7iznmBh9AHyvBIU/O330b7aKdNWHJsYaK
+lX7HGXYti9nG7dmCxDbZFIWu73JU4fh6PJd/lbRYngHcZ6XoZtsX26bAvWRGqfym
+XQ+RK7yURb87iXEUmToCVlYQRoKTme7uDcJowej/6yjNuru9/VQjDnSG930cY+Ak
+Ssxs73ivn8dnNOvcDOiNeEwjkUONBPFWe8GYiSJhZ6m89QIDAQABo3YwdDAfBgNV
+HSMEGDAWgBRuivX/mIwxHOZYPidUORDQiTMxrDAJBgNVHRMEAjAAMAsGA1UdDwQE
+AwIE8DAaBgNVHREEEzARgg9taW5pZmktY3BwLWZsb3cwHQYDVR0OBBYEFCxfop38
+h8apLKD1KQT5iqbzbOTKMA0GCSqGSIb3DQEBCwUAA4IBAQBbsjR5cdOUWEpGi81E
+jNZ8Wtcxl/ebnl4RRwI2tACn1WHLEj65w0dm2BHzs8e6a3JnA8NDwTDctCsUHV2m
+DtxriBxtcPNcaGtzh2IMwX26a+sVfLR1pP1HJi77AGyLA3Rhm1hBws//KZ6seEbc
+qKoQR0o99UcfASQP1uqBC4LF9k4gzhvE8n+KlTJ5neEwZ4jpD505OiCsN5ZMLa+3
+mCyTGdrO4EXSl2ElvaSSxzf5+FfGE2LT+oSrdwQYFuB+9/qr+c5nU+M8e2E0QLvN
+UusnNOw8wp8rEHiSE6DDhp0PhfxhiOVz3l1oJQduzr7L5aO6kgSlT3nWn3PqPaJo
+G0Vd
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.key b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.key
new file mode 100644
index 000000000..c73b44e39
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/minifi-cpp-flow.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCoUQEteI6SMNDy
+q3pU4cqsJ9CaHRr0UvlU2V9hS9UjK5UWkvX7rBz2zf/vJ34zwzCSFvNcdBuH8eHV
+UciuaZw1ZV6yYKzy9Tt/2+54G1eaooTNM379qk1FIAHOeSQmgdEkXBCNeruLOeYG
+H0AfK8EhT87ffRvtop01YcmxhoqVfscZdi2L2cbt2YLENtkUha7vclTh+Ho8l3+V
+tFieAdxnpehm2xfbpsC9ZEap/KZdD5ErvJRFvzuJcRSZOgJWVhBGgpOZ7u4NwmjB
+6P/rKM26u739VCMOdIb3fRxj4CRKzGzveK+fx2c069wM6I14TCORQ40E8VZ7wZiJ
+ImFnqbz1AgMBAAECggEAKYizrbDOHa0GIpvF+CQviwPYKe98s0W2WQW6z5uS4Lbk
+d0mUgaIbE5wJx84LCmLkHWikbPAJyyYZADbKOp+8+EAnegT5KIrzP73ZvrGgkHwC
+IVDPyXC42JHpYDXsgcQPA9XkD8V1egmzhVc4z3hQlBPJjMSmm6FBAec7ih8VG4Zi
+fxZ3b4fLmezQvqzC+MBRTMqkWU0A4Q3+IIqe5hTUReglTD8yeu9rzyM8xPYksFV1
+qOatWB22IS+fdU/0xzKp3at/yjU8a4DiTnyosNN/2txazQdU1LdSlNxwDqlTDPbi
+swybSc1a6KLO+ZeD4yGB1NLvPCQuIwR4ugw6dQoMkQKBgQC/05rcrGgju4QBcfSe
+SHagt5nypeGUy/yXIlUbWoetETHx/NXaXtWZ7vxN7S0u09HTr7F4+wu00td3etMA
+dOTuqGU/YEHFoZutDpg+oauZIL+MecM1uZeKKTHN3wQ0qmLsYOS172fNUcZ/gcRt
+4JylyK45c10oE0SvXKeC9bWsSwKBgQDgn/Hhhp57NwNWdlOuA6oXkmSN/UFZWE7w
+EntMcviYqGGtBVW9O3D9SqdbEb0kO2NByNgpjsjUTAGCsyYSa50O3q28vQw4z3qK
+EWyg6Ep51eHDUQDo1cdoIRFrFxrZLOMUoVTfT1uqAI/vHxQ089s60W+WUmU5wUic
+9fJoZdnzvwKBgHEwnI2gEecby73Kjywi2BTnoaiDZ0OUxlwrvwpf9fUSU2VV6p5r
+HSEy2p/k1qduB78gSdl4USUG0GtJB16am0eUCAJIeybxwFlyZjV20jmOEFkEtEJs
+W9YDjsbK1MF61NpkJjCQrrCBk15DpTOsuOI+M0flIc/25q2PP6zP7b5XAoGAO/d3
+Q5YEyTAum+6K+HHR/uj+H0n1ID0LFdxZPleTNm39ZYt/ED3GNFixxQY/UGTqYq2T
+x8RuqP6BiLr69v/ztfyMtU5i7Oe29xUfwvVArLYEx3fgnkg0LABn/gb1C/WHygIn
+/lXZStFLm7LYWiqf5Fv1RlRI4dpP4Fdol6ZZQVECgYBWCtKu2CmRb4cYbYo8092E
+72Jh5B9fuBqcUFELjSgUzxgL/L7P6mx6evCCUj4o4Zzy+U45tm2AYpKAXhkQh33h
+xDFs1t0X2CHSTvES65CCEniN/RKoOBu59fEbZu6hV1tzoz9iU6lgKNkndDeeeYip
+hIGI5QK6mK7sNyOnfE+b4g==
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.key b/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.key
new file mode 100644
index 000000000..fbb74cc67
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.key
@@ -0,0 +1,30 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIgbRh5/zItgICAggA
+MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECKS8TpnvoUisBIIEyBOQzTn7rMTg
+Igkc79mdZZsU0XX8Z9USUAEVRnKylsWxnt0BX+DlsgnX6bP33gHUXdCSllVRlyKu
+vpNfJadLUNXGU0ZWpYmnD+jtd0J2j6qVqQGQgSuZ0UpPe/6cBaiZcM2BCeWNr0J6
+f7/HDc8AgyeHyP3v+ZqfO0c9TC6N9BMVBreKLtejI3jbNMJOPMdT3sIdyaKLJonO
+ozIZES3ZhPR9kKDRAgk4NUEaPF41SBOYnlqIR4zfBVbtzn2CHbejvAt32KHlfJMj
+D53GrdKDZdOGTAal90rHw127F75ScF5dyskst1PBO4VxufYPzmo/KgMoZ0TAPDzh
+DUDI0Glff9jep8mrACp8XmEMD9uqd3a/E75KWZWQ5eyUrBrzr1qIdLsGsD0UO00o
+MEEIpkz2Is6VTJMGcNCUXDHbKUgHagSs5T35OHESC/JqWM02i/GCiubPPeNfDEjJ
+wGCwdilMhNcuJR8D3Rb9xyv/NOxsECuO+0rDgV9kCe3WF4p3qey4Pcf3NC9BwmOZ
+QTVpa59rlLqGSbrB/EgETucvBFGy9ZhCs/A3Wuc2yirtzWiAsky95643scQbrUrD
+P+QtrfaDLi3Gj9xEjGzUSoui7MogDaQtIBVa81tioQ7TKzQl1cPBToAes/8vPQ/5
+Jrfc04wVH8jz1YSlP37ho63K7YNVINe+Rplwmf/6S0vQqqEKH0EhPiyqRflJ5Xy5
+HyM1p5vQujg45TsltL9DG/fxiO7uR9D0D/1dz/kMZszAPFeP3yQRaKMCJqMc0hXE
+SjWOpblm8y+RxPwehY7OTCKJtH9acURLsFAeHDqHKcobyPOCv2D3ekhx7ujuzyyb
+BrdGKMsTu7e+sEFZrNAvqBDr7KJuS3YnRX6y9lNySh+WzSBf+AW6Tzpabz5GBMEz
+UdOaq0MvtsYD3GOJvDBHHJS8AAPkqUvZnsztJSl5soKkbPJdUpZF2B3PH93eREb1
+sjUkTG8V7JrdpGrJvqkuVnzDFT2xVlt4dDByEj7wFkRtjX/WEDPX8HvNI34hPGSN
+5m88kTWQncd0h6+lbxKDz5dFZ06uDt3m44YFjVBZUpKhTNKgKnBFh1yEXN/LczVz
+DtNtTVlY+zK2MFdfZ3GB8aGsTajGmXreoom8BbigH0QXjp4t9cZHMbPHz0BfjQ9O
+eEukx+2LyH4+DovqKGUN9iZrqe5VAin9wsgyGDt2N5qJfZj++jmF9r5m9JNhOKLm
+RcZwjnayRfVXILGqoRI/JOlwfximJReOzAvYiVtEhCdjn3J2QJsNAMOlc5oUpwdM
+8MVzXc5/ccsKVzNbsAfBaG5IbTLP+iH2pbCmBhaZeX7X5eB4Fsx5ZKcDqGXHju8Y
+xJNIlgV9ALU3Oqnv1yOGGb7BStgsGVAD8lUfycXZjCTwiXiOMx4/KUWtnBkiOz4x
+R/9FZEGIupfRY2MqkFVbaMpYH3u1pXTVt+hDzEak6NboXbH2hIdQndAA5plxcU5y
+SD+xEiTCHWKxkH/W8mzzzPYXKpr17MjCwqok0+FiR2tT9NkPj7G2Ob0vKfAcBRBD
+Ou4COgYTK2AORxgzgs0fM8Par+sa5p1wuuUILIvAxkqNqh/e3G70xj5OMtdaUrT+
+pIPFUG0yfz/lqoTxfYdtUw==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.pem b/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.pem
new file mode 100644
index 000000000..948ebd5b2
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/certs/root-ca.pem
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIC/zCCAeegAwIBAgIUOzrVvbxYHge08GMeuuJmOPgWOtswDQYJKoZIhvcNAQEL
+BQAwDzENMAsGA1UEAwwEbXlDQTAeFw0yMjEwMTQxNDQ0MzlaFw0yNzEwMTMxNDQ0
+MzlaMA8xDTALBgNVBAMMBG15Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQCwXjMjfLWgn8YVWkOCarKc40tGCNrRKITgiKZSQvM44hQw9wbYk/yRj67S
+5bZIr5djvnbmJC5XQSj/vAYL92B37qtualPlw/1ziUURSN1MQKUMgbirKAcGFDMJ
+7g6yrCXrq94A/66cQl1fksXfFk8RzSBPMWfzLNTzy96A/2HriYn3J9GFQV3ggEAg
+VcVJhFiyRBBreZEIar+sasDeR5T1lOfn2Ev7QbiWXTr9IqR1mv4dtqKOv9oYld5Y
+AfoQ0P6yis+RLuNcokNIHgUO+ORpy+CUmIOrCbZgVjSetSlZ8SNQkCXwWhKcObaI
+PK15veDo4SyPv2GDEEBbD1Ccqe/nAgMBAAGjUzBRMB0GA1UdDgQWBBRuivX/mIwx
+HOZYPidUORDQiTMxrDAfBgNVHSMEGDAWgBRuivX/mIwxHOZYPidUORDQiTMxrDAP
+BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAtnKrcaSoSVgHtFBe8
+HBEbK1tzTXrBeFEEQqNZpRoJ6JSq1oY54UlAlBJdNQJFZOPoeaf9vheevFGFhBDJ
+PwRqE5T6wSp9aohUcnKiBzL8VbTIuUgyWM9gwyg/lipTYK6U5sDROUzfEppBJb0E
+gBp7eBoiEcI4tuih3zncbKKvTNR3b/twAL5yI4MEUNcJsi80xefQt0m39RAwpMIu
+khQNH2YLNOl6hl6wRTylCGB6eWZAtB4qGEDtdCXG9uyCnoUO5MFgZZGFiaqN+avF
+cUU2mOwaoPDbWQ7Q+2m9c1fo/iJ8RThAnFhH5/GQe+OaecW8vHmfCbivj7wkRgTW
+nXzz
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/config.yml b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
new file mode 100644
index 000000000..d4c69534c
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
@@ -0,0 +1,43 @@
+MiNiFi Config Version: 3
+Flow Controller:
+  name: MiNiFi Flow
+Processors:
+- name: Get files from /tmp/input
+  id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27
+  class: org.apache.nifi.minifi.processors.GetFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 1000 ms
+  Properties:
+    Input Directory: /tmp/input
+- name: Put files to /tmp/output
+  id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+  class: org.apache.nifi.minifi.processors.PutFile
+  scheduling strategy: EVENT_DRIVEN
+  auto-terminated relationships list:
+  - failure
+  - success
+  Properties:
+    Conflict Resolution Strategy: fail
+    Create Missing Directories: 'true'
+    Directory: /tmp/output
+Connections:
+- name: GetFile/success/PutFile
+  id: 098a56ba-f4bf-4323-a3f3-6f8a5e3586bf
+  source id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27
+  source relationship names:
+  - success
+  destination id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+Controller Services:
+  - name: SSLContextService
+    id: 2438e3c8-015a-1000-79ca-83af40ec1994
+    class: SSLContextService
+    Properties:
+      Client Certificate:
+          - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.crt
+      Private Key:
+          - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.key
+      Passphrase:
+          - value: abcdefgh
+      CA Certificate:
+          - value: /tmp/minifi-c2-server-ssl/minifi-c2-server.crt
+Remote Process Groups: []
diff --git a/docker/test/integration/resources/minifi-c2-server/Dockerfile b/docker/test/integration/resources/minifi-c2-server/Dockerfile
new file mode 100644
index 000000000..ce13e0312
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server/Dockerfile
@@ -0,0 +1,2 @@
+FROM apache/nifi-minifi-c2:1.18.0
+ADD config.yml $MINIFI_C2_HOME/files/minifi-test-class/config.text.yml.v1
diff --git a/docker/test/integration/resources/minifi-c2-server/config.yml b/docker/test/integration/resources/minifi-c2-server/config.yml
new file mode 100644
index 000000000..ddac4cd77
--- /dev/null
+++ b/docker/test/integration/resources/minifi-c2-server/config.yml
@@ -0,0 +1,31 @@
+MiNiFi Config Version: 3
+Flow Controller:
+  name: MiNiFi Flow
+Processors:
+- name: Get files from /tmp/input
+  id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27
+  class: org.apache.nifi.minifi.processors.GetFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 1000 ms
+  Properties:
+    Input Directory: /tmp/input
+- name: Put files to /tmp/output
+  id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+  class: org.apache.nifi.minifi.processors.PutFile
+  scheduling strategy: EVENT_DRIVEN
+  auto-terminated relationships list:
+  - failure
+  - success
+  Properties:
+    Conflict Resolution Strategy: fail
+    Create Missing Directories: 'true'
+    Directory: /tmp/output
+Connections:
+- name: GetFile/success/PutFile
+  id: 098a56ba-f4bf-4323-a3f3-6f8a5e3586bf
+  source id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27
+  source relationship names:
+  - success
+  destination id: e143601d-de4f-44ba-a6ec-d1f97d77ec94
+Controller Services: []
+Remote Process Groups: []
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 723576715..6ae9b5904 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -963,3 +963,32 @@ def step_impl(context):
 @then(u'Opensearch has a document with "{doc_id}" in "{index}" that has "{value}" set in "{field}"')
 def step_impl(context, doc_id, index, value, field):
     context.test.check_elastic_field_value("opensearch", index_name=index, doc_id=doc_id, field_name=field, field_value=value)
+
+
+# MiNiFi C2 Server
+@given("a ssl context service is set up for MiNiFi C2 server")
+def step_impl(context):
+    ssl_context_service = SSLContextService(cert="/tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.crt", ca_cert="/tmp/resources/minifi-c2-server-ssl/root-ca.pem", key="/tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.key", passphrase="abcdefgh")
+    ssl_context_service.name = "SSLContextService"
+    container = context.test.acquire_container("minifi-cpp-flow", "minifi-cpp-with-https-c2-config")
+    container.add_controller(ssl_context_service)
+
+
+@given(u'a MiNiFi C2 server is set up')
+def step_impl(context):
+    context.test.acquire_container("minifi-c2-server", "minifi-c2-server")
+
+
+@then("the MiNiFi C2 server logs contain the following message: \"{log_message}\" in less than {duration}")
+def step_impl(context, log_message, duration):
+    context.test.check_container_log_contents("minifi-c2-server", log_message, timeparse(duration))
+
+
+@then("the MiNiFi C2 SSL server logs contain the following message: \"{log_message}\" in less than {duration}")
+def step_impl(context, log_message, duration):
+    context.test.check_container_log_contents("minifi-c2-server-ssl", log_message, timeparse(duration))
+
+
+@given(u'a MiNiFi C2 server is set up with SSL')
+def step_impl(context):
+    context.test.acquire_container("minifi-c2-server", "minifi-c2-server-ssl")
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index e1ce3e978..b58bf3233 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -103,11 +103,19 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct
   extensions::curl::HTTPClient client(url, ssl_context_service_);
   client.setKeepAliveProbe(extensions::curl::KeepAliveProbeData{2s, 2s});
   client.setConnectionTimeout(2s);
-  if (direction == Direction::TRANSMIT) {
-    client.set_request_method("POST");
-    if (!ssl_context_service_ && url.find("https://") == 0) {
-      setSecurityContext(client, "POST", url);
+
+  auto setUpHttpRequest = [&](const std::string& http_method) {
+    client.set_request_method(http_method);
+    if (url.find("https://") == 0) {
+      if (!ssl_context_service_) {
+        setSecurityContext(client, http_method, url);
+      } else {
+        client.initialize(http_method, url, ssl_context_service_);
+      }
     }
+  };
+  if (direction == Direction::TRANSMIT) {
+    setUpHttpRequest("POST");
     if (payload.getOperation() == Operation::TRANSFER) {
       // treat nested payloads as files
       for (const auto& file : payload.getNestedPayloads()) {
@@ -148,17 +156,16 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct
   } else {
     // we do not need to set the upload callback
     // since we are not uploading anything on a get
-    if (!ssl_context_service_ && url.find("https://") == 0) {
-      setSecurityContext(client, "GET", url);
-    }
-    client.set_request_method("GET");
+    setUpHttpRequest("GET");
   }
 
   if (payload.getOperation() == Operation::TRANSFER) {
     auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max());
     client.setReadCallback(std::move(read));
   } else {
-    client.setRequestHeader("Accept", "application/json");
+    // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
+    // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
+    // client.setRequestHeader("Accept", "application/json");
     client.setContentType("application/json");
   }
   bool isOkay = client.submit();
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index c0f8a9ad2..88484d5a8 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -609,7 +609,7 @@ class HeartbeatHandler : public ServerAwareHandler {
     for (const auto& operation_node : agent_manifest["supportedOperations"].GetArray()) {
       assert(operation_node.HasMember("type"));
       operations.insert(operation_node["type"].GetString());
-      verifyProperties(operation_node, minifi::c2::Operation::parse(operation_node["type"].GetString()), verify_components, disallowed_properties);
+      verifyProperties(operation_node, minifi::c2::Operation::parse(operation_node["type"].GetString(), {}, false), verify_components, disallowed_properties);
     }
 
     assert(operations == minifi::c2::Operation::values());
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index f1354adbb..97710c0d0 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -110,7 +110,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   int16_t resume() override;
   // Unload the current flow YAML, clean the root process group and all its children
   int16_t stop() override;
-  int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist) override;
+  int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override;
   int16_t drainRepositories() override {
     return -1;
   }
@@ -145,7 +145,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   // first it will validate the payload with the current root node config for flowController
   // like FlowController id/name is the same and new version is greater than the current version
   // after that, it will apply the configuration
-  bool applyConfiguration(const std::string &source, const std::string &configurePayload);
+  bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt);
 
   // get name
   std::string getName() const override {
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 3a491d1cb..9ad280cbb 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -171,8 +171,8 @@ class C2Agent : public state::UpdateController {
   void handleAssetUpdate(const C2ContentResponse &resp);
 
   std::optional<std::string> resolveFlowUrl(const std::string& url) const;
-
   std::optional<std::string> resolveUrl(const std::string& url) const;
+  static std::optional<std::string> getFlowIdFromConfigUpdate(const C2ContentResponse &resp);
 
  protected:
   std::timed_mutex metrics_mutex_;
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 1c2be2e79..860351cb6 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -66,9 +66,9 @@ class FlowConfiguration : public CoreComponent {
    * Constructor that will be used for configuring
    * the flow controller.
    */
-  explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
+  explicit FlowConfiguration(const std::shared_ptr<core::Repository>& repo, std::shared_ptr<core::Repository> flow_file_repo,
                              std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
-                             std::shared_ptr<Configure> configuration, const std::optional<std::string>& path,
+                             const std::shared_ptr<Configure>& configuration, const std::optional<std::string>& path,
                              std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>());
 
   ~FlowConfiguration() override;
@@ -112,7 +112,7 @@ class FlowConfiguration : public CoreComponent {
     return nullptr;
   }
 
-  std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string& url, const std::string& yamlConfigPayload);
+  std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string& url, const std::string& yamlConfigPayload, const std::optional<std::string>& flow_id = std::nullopt);
 
   virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string& /*yamlConfigPayload*/) {
     return nullptr;
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 4fc93883f..f87cc7a56 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -164,7 +164,7 @@ class StateMonitor : public StateController {
    * < 0 is an error code
    * 0 is success
    */
-  virtual int16_t applyUpdate(const std::string & source, const std::string &configuration, bool persist = false) = 0;
+  virtual int16_t applyUpdate(const std::string & source, const std::string &configuration, bool persist = false, const std::optional<std::string>& flow_id = std::nullopt) = 0;
 
   /**
    * Apply an update that the agent must decode. This is useful for certain operations
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index e0f95c1a7..e08eb6684 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -90,7 +90,7 @@ class FlowVersion : public DeviceInformation {
 
   void setFlowVersion(const std::string &url, const std::string &bucket_id, const std::string &flow_id) {
     std::lock_guard<std::mutex> lock(guard);
-    identifier = std::make_shared<FlowIdentifier>(url, bucket_id, flow_id);
+    identifier = std::make_shared<FlowIdentifier>(url, bucket_id, flow_id.empty() ? utils::IdGenerator::getIdGenerator()->generate().to_string() : flow_id);
   }
 
   std::vector<SerializedResponseNode> serialize() override {
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6ef93b35b..2f82fe8aa 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -103,10 +103,10 @@ FlowController::~FlowController() {
   logger_->log_trace("Destroying FlowController");
 }
 
-bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) {
+bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
-    newRoot = flow_configuration_->updateFromPayload(source, configurePayload);
+    newRoot = flow_configuration_->updateFromPayload(source, configurePayload, flow_id);
   } catch (const std::exception& ex) {
     logger_->log_error("Invalid configuration payload, type: %s, what: %s", typeid(ex).name(), ex.what());
     return false;
@@ -422,8 +422,8 @@ int16_t FlowController::resume() {
   return 0;
 }
 
-int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration, bool persist) {
-  if (applyConfiguration(source, configuration)) {
+int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) {
+  if (applyConfiguration(source, configuration, flow_id)) {
     if (persist) {
       flow_configuration_->persist(configuration);
     }
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index fc61de0b8..7fe53898a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -430,7 +430,7 @@ C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) co
 void C2Agent::handle_clear(const C2ContentResponse &resp) {
   ClearOperand operand;
   try {
-    operand = ClearOperand::parse(resp.name.c_str());
+    operand = ClearOperand::parse(resp.name.c_str(), {}, false);
   } catch(const std::runtime_error&) {
     logger_->log_debug("Clearing unknown %s", resp.name);
     return;
@@ -485,7 +485,7 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
 void C2Agent::handle_describe(const C2ContentResponse &resp) {
   DescribeOperand operand;
   try {
-    operand = DescribeOperand::parse(resp.name.c_str());
+    operand = DescribeOperand::parse(resp.name.c_str(), {}, false);
   } catch(const std::runtime_error&) {
     C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
     enqueue_c2_response(std::move(response));
@@ -587,7 +587,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
 void C2Agent::handle_update(const C2ContentResponse &resp) {
   UpdateOperand operand;
   try {
-    operand = UpdateOperand::parse(resp.name.c_str());
+    operand = UpdateOperand::parse(resp.name.c_str(), {}, false);
   } catch(const std::runtime_error&) {
     C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NOT_APPLIED, resp.ident, true);
     enqueue_c2_response(std::move(response));
@@ -696,7 +696,7 @@ C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::Inp
 void C2Agent::handle_transfer(const C2ContentResponse &resp) {
   TransferOperand operand;
   try {
-    operand = TransferOperand::parse(resp.name.c_str());
+    operand = TransferOperand::parse(resp.name.c_str(), {}, false);
   } catch(const std::runtime_error&) {
     throw C2TransferError("Unknown operand '" + resp.name + "'");
   }
@@ -848,6 +848,11 @@ std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
   return response.getRawDataAsString();
 }
 
+std::optional<std::string> C2Agent::getFlowIdFromConfigUpdate(const C2ContentResponse &resp) {
+  auto flow_id = resp.operation_arguments.find("flowId");
+  return flow_id == resp.operation_arguments.end() ? std::nullopt : std::make_optional(flow_id->second.to_string());
+}
+
 bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
   auto url = resp.operation_arguments.find("location");
 
@@ -886,7 +891,7 @@ bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
     return utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true");
   }();
 
-  int16_t err = {update_sink_->applyUpdate(file_uri, configuration_str, should_persist)};
+  int16_t err = {update_sink_->applyUpdate(file_uri, configuration_str, should_persist, getFlowIdFromConfigUpdate(resp))};
   if (err != 0) {
     logger_->log_error("Flow configuration update failed with error code %" PRIi16, err);
     C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true);
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 7ef042a09..1e573eb58 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -65,7 +65,9 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, gsl::span<co
       std::string identifier;
       for (auto key : {"operationid", "operationId", "identifier"}) {
         if (root.HasMember(key)) {
-          identifier = root[key].GetString();
+          if (!root[key].IsNull()) {
+            identifier = root[key].GetString();
+          }
           break;
         }
       }
@@ -73,7 +75,9 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, gsl::span<co
       int size = 0;
       for (auto key : {"requested_operations", "requestedOperations"}) {
         if (root.HasMember(key)) {
-          size = root[key].Size();
+          if (!root[key].IsNull()) {
+            size = root[key].Size();
+          }
           break;
         }
       }
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index b35cda810..a34002be9 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -29,9 +29,9 @@
 namespace org::apache::nifi::minifi::core {
 
 FlowConfiguration::FlowConfiguration(
-    std::shared_ptr<core::Repository> /*repo*/, std::shared_ptr<core::Repository> flow_file_repo,
+    const std::shared_ptr<core::Repository>& /*repo*/, std::shared_ptr<core::Repository> flow_file_repo,
     std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
-    std::shared_ptr<Configure> configuration, const std::optional<std::string>& path,
+    const std::shared_ptr<Configure>& configuration, const std::optional<std::string>& path,
     std::shared_ptr<utils::file::FileSystem> filesystem)
     : CoreComponent(core::getClassName<FlowConfiguration>()),
       flow_file_repo_(std::move(flow_file_repo)),
@@ -97,24 +97,24 @@ std::unique_ptr<core::reporting::SiteToSiteProvenanceReportingTask> FlowConfigur
   return processor;
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string& url, const std::string& yamlConfigPayload) {
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string& url, const std::string& yamlConfigPayload, const std::optional<std::string>& flow_id) {
   auto old_services = controller_services_;
   auto old_provider = service_provider_;
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
   service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
   auto payload = getRootFromPayload(yamlConfigPayload);
   if (!url.empty() && payload != nullptr) {
-    std::string flow_id;
+    std::string payload_flow_id;
     std::string bucket_id;
     auto path_split = utils::StringUtils::split(url, "/");
     for (auto it = path_split.cbegin(); it != path_split.cend(); ++it) {
       if (*it == "flows" && std::next(it) != path_split.cend()) {
-        flow_id = *++it;
+        payload_flow_id = *++it;
       } else if (*it == "buckets" && std::next(it) != path_split.cend()) {
         bucket_id = *++it;
       }
     }
-    flow_version_->setFlowVersion(url, bucket_id, flow_id);
+    flow_version_->setFlowVersion(url, bucket_id, flow_id ? *flow_id : payload_flow_id);
   } else {
     controller_services_ = old_services;
     service_provider_ = old_provider;
diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp
index 453bd4bf9..e3469e1d5 100644
--- a/libminifi/src/core/state/nodes/SupportedOperations.cpp
+++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp
@@ -139,7 +139,7 @@ std::vector<SerializedResponseNode> SupportedOperations::serialize() {
     SerializedResponseNode properties;
     properties.name = "properties";
 
-    fillProperties(properties, minifi::c2::Operation::parse(operation.c_str()));
+    fillProperties(properties, minifi::c2::Operation::parse(operation.c_str(), {}, false));
 
     child.children.push_back(operation_type);
     child.children.push_back(properties);
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index eca69e38b..8b7a2bec1 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -158,7 +158,7 @@ class TestUpdateSink : public minifi::state::StateMonitor {
    * < 0 is an error code
    * 0 is success
    */
-  int16_t applyUpdate(const std::string& /*source*/, const std::string& /*configuration*/, bool /*persist*/ = false) override {
+  int16_t applyUpdate(const std::string& /*source*/, const std::string& /*configuration*/, bool /*persist*/ = false, const std::optional<std::string>& /*flow_id*/ = std::nullopt) override {
     update_calls++;
     return 0;
   }


[nifi-minifi-cpp] 02/02: MINIFICPP-1922 Implement ListenUDP processor

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 20afc8447a812e99338beec386b7e02d5516b556
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Nov 28 12:29:37 2022 +0100

    MINIFICPP-1922 Implement ListenUDP processor
    
    Closes #1430
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  41 +++++++-
 README.md                                          |   6 +-
 .../processors/ListenSyslog.cpp                    |  10 +-
 .../standard-processors/processors/ListenSyslog.h  |   2 -
 .../standard-processors/processors/ListenTCP.cpp   |  10 +-
 .../standard-processors/processors/ListenTCP.h     |   2 -
 .../processors/{ListenTCP.cpp => ListenUDP.cpp}    |  50 +++------
 .../processors/{ListenTCP.h => ListenUDP.h}        |  17 +--
 .../processors/NetworkListenerProcessor.cpp        |   9 +-
 .../processors/NetworkListenerProcessor.h          |  11 +-
 .../tests/unit/ListenSyslogTests.cpp               |  10 +-
 .../tests/unit/ListenTcpTests.cpp                  |   8 +-
 .../tests/unit/ListenUDPTests.cpp                  | 115 +++++++++++++++++++++
 .../standard-processors/tests/unit/PutTCPTests.cpp |  11 +-
 14 files changed, 200 insertions(+), 102 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 3144e8390..c8cee2b17 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -46,6 +46,7 @@
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
 - [ListenTCP](#listentcp)
+- [ListenUDP](#listenudp)
 - [ListFile](#listfile)
 - [ListS3](#lists3)
 - [ListSFTP](#listsftp)
@@ -1331,10 +1332,42 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Output Attributes
 
-| Attribute                | Description                                   | Requirements           |
-|--------------------------|-----------------------------------------------|------------------------|
-| _tcp.port_               | The sending port the messages were received.  | -                      |
-| _tcp.sender_             | The sending host of the messages.             | -                      |
+| Attribute    | Description                                  | Requirements |
+|--------------|----------------------------------------------|--------------|
+| _tcp.port_   | The sending port the messages were received. | -            |
+| _tcp.sender_ | The sending host of the messages.            | -            |
+
+
+
+
+## ListenUDP
+
+### Description
+
+Listens for incoming UDP datagrams. For each datagram the processor produces a single FlowFile.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values           | Description                                                                                                                                                                                      |
+|-------------------------------|---------------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                            | The port to listen on for communication.                                                                                                                                                         |
+| **Max Batch Size**            | 500           |                            | The maximum number of messages to process at a time.                                                                                                                                             |
+| **Max Size of Message Queue** | 10000         |                            | Maximum number of messages allowed to be buffered before processing them when the processor is triggered. If the buffer is full, the message is ignored. If set to zero the buffer is unlimited. |
+
+### Relationships
+
+| Name    | Description                                                        |
+|---------|--------------------------------------------------------------------|
+| success | Messages received successfully will be sent out this relationship. |
+
+### Output Attributes
+
+| Attribute    | Description                                   | Requirements |
+|--------------|-----------------------------------------------|--------------|
+| _udp.port_   | The sending port the messages were received.  | -            |
+| _udp.sender_ | The sending host of the messages.             | -            |
 
 
 ## ListFile
diff --git a/README.md b/README.md
index dc56009b3..096cea87d 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
 
 The following table lists the base set of processors.
 
-| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
-|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
-| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
+| Extension Set | Processors                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
+|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
+| **Base**      | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index e08d0cb15..41518a1d1 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -107,7 +107,7 @@ void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& conte
   context->getProperty(ProtocolProperty.getName(), protocol);
 
   if (protocol == utils::net::IpProtocol::TCP) {
-    startTcpServer(*context);
+    startTcpServer(*context, SSLContextService, ClientAuth);
   } else if (protocol == utils::net::IpProtocol::UDP) {
     startUdpServer(*context);
   } else {
@@ -168,14 +168,6 @@ const core::Property& ListenSyslog::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenSyslog::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenSyslog::getClientAuthProperty() {
-  return ClientAuth;
-}
-
 REGISTER_RESOURCE(ListenSyslog, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index ebdb16d2c..9844c0c1b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -70,8 +70,6 @@ class ListenSyslog : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp
index 20d856182..94c2d9884 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -67,7 +67,7 @@ void ListenTCP::initialize() {
 
 void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context);
-  startTcpServer(*context);
+  startTcpServer(*context, SSLContextService, ClientAuth);
 }
 
 void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
@@ -90,14 +90,6 @@ const core::Property& ListenTCP::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenTCP::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenTCP::getClientAuthProperty() {
-  return ClientAuth;
-}
-
 REGISTER_RESOURCE(ListenTCP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenTCP.h
index ed41eba80..567865346 100644
--- a/extensions/standard-processors/processors/ListenTCP.h
+++ b/extensions/standard-processors/processors/ListenTCP.h
@@ -60,8 +60,6 @@ class ListenTCP : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenUDP.cpp
similarity index 58%
copy from extensions/standard-processors/processors/ListenTCP.cpp
copy to extensions/standard-processors/processors/ListenUDP.cpp
index 20d856182..819919e91 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenUDP.cpp
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "ListenTCP.h"
+#include "ListenUDP.h"
 
 #include "core/Resource.h"
 #include "core/PropertyBuilder.h"
@@ -23,14 +23,14 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-const core::Property ListenTCP::Port(
+const core::Property ListenUDP::Port(
     core::PropertyBuilder::createProperty("Listening Port")
         ->withDescription("The port to listen on for communication.")
         ->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::MaxQueueSize(
+const core::Property ListenUDP::MaxQueueSize(
     core::PropertyBuilder::createProperty("Max Size of Message Queue")
         ->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
                           "If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
@@ -38,66 +38,46 @@ const core::Property ListenTCP::MaxQueueSize(
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::MaxBatchSize(
+const core::Property ListenUDP::MaxBatchSize(
     core::PropertyBuilder::createProperty("Max Batch Size")
         ->withDescription("The maximum number of messages to process at a time.")
         ->withDefaultValue<uint64_t>(500)
         ->isRequired(true)
         ->build());
 
-const core::Property ListenTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
-
-const core::Property ListenTCP::ClientAuth(
-    core::PropertyBuilder::createProperty("Client Auth")
-      ->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
-      ->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
-      ->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
-      ->build());
 
-const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
+const core::Relationship ListenUDP::Success("success", "Messages received successfully will be sent out this relationship.");
 
-void ListenTCP::initialize() {
+void ListenUDP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+void ListenUDP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context);
-  startTcpServer(*context);
+  startUdpServer(*context);
 }
 
-void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
+void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
   auto flow_file = session.create();
   session.writeBuffer(flow_file, message.message_data);
-  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
-  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  flow_file->setAttribute("udp.port", std::to_string(message.server_port));
+  flow_file->setAttribute("udp.sender", message.sender_address.to_string());
   session.transfer(flow_file, Success);
 }
 
-const core::Property& ListenTCP::getMaxBatchSizeProperty() {
+const core::Property& ListenUDP::getMaxBatchSizeProperty() {
   return MaxBatchSize;
 }
 
-const core::Property& ListenTCP::getMaxQueueSizeProperty() {
+const core::Property& ListenUDP::getMaxQueueSizeProperty() {
   return MaxQueueSize;
 }
 
-const core::Property& ListenTCP::getPortProperty() {
+const core::Property& ListenUDP::getPortProperty() {
   return Port;
 }
 
-const core::Property& ListenTCP::getSslContextProperty() {
-  return SSLContextService;
-}
-
-const core::Property& ListenTCP::getClientAuthProperty() {
-  return ClientAuth;
-}
-
-REGISTER_RESOURCE(ListenTCP, Processor);
+REGISTER_RESOURCE(ListenUDP, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ListenTCP.h b/extensions/standard-processors/processors/ListenUDP.h
similarity index 71%
copy from extensions/standard-processors/processors/ListenTCP.h
copy to extensions/standard-processors/processors/ListenUDP.h
index ed41eba80..1eb4cfd96 100644
--- a/extensions/standard-processors/processors/ListenTCP.h
+++ b/extensions/standard-processors/processors/ListenUDP.h
@@ -18,35 +18,28 @@
 
 #include <memory>
 #include <string>
-#include <utility>
 
 #include "NetworkListenerProcessor.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-class ListenTCP : public NetworkListenerProcessor {
+class ListenUDP : public NetworkListenerProcessor {
  public:
-  explicit ListenTCP(std::string name, const utils::Identifier& uuid = {})
-    : NetworkListenerProcessor(std::move(name), uuid, core::logging::LoggerFactory<ListenTCP>::getLogger()) {
+  explicit ListenUDP(const std::string& name, const utils::Identifier& uuid = {})
+    : NetworkListenerProcessor(name, uuid, core::logging::LoggerFactory<ListenUDP>::getLogger()) {
   }
 
-  EXTENSIONAPI static constexpr const char* Description = "Listens for incoming TCP connections and reads data from each connection using a line separator as the message demarcator. "
-                                                          "For each message the processor produces a single FlowFile.";
+  EXTENSIONAPI static constexpr const char* Description = "Listens for incoming UDP datagrams. For each datagram the processor produces a single FlowFile.";
 
   EXTENSIONAPI static const core::Property Port;
   EXTENSIONAPI static const core::Property MaxBatchSize;
   EXTENSIONAPI static const core::Property MaxQueueSize;
-  EXTENSIONAPI static const core::Property SSLContextService;
-  EXTENSIONAPI static const core::Property ClientAuth;
   static auto properties() {
     return std::array{
       Port,
       MaxBatchSize,
       MaxQueueSize,
-      SSLContextService,
-      ClientAuth
     };
   }
 
@@ -60,8 +53,6 @@ class ListenTCP : public NetworkListenerProcessor {
   const core::Property& getMaxBatchSizeProperty() override;
   const core::Property& getMaxQueueSizeProperty() override;
   const core::Property& getPortProperty() override;
-  const core::Property& getSslContextProperty() override;
-  const core::Property& getClientAuthProperty() override;
 
  private:
   void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) override;
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index fa22d9096..21b74f1ef 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -61,18 +61,17 @@ void NetworkListenerProcessor::startServer(const ServerOptions& options, utils::
                      max_batch_size_);
 }
 
-void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& context) {
+void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property) {
   gsl_Expects(!server_thread_.joinable() && !server_);
   auto options = readServerOptions(context);
 
   std::string ssl_value;
-  auto& ssl_prop = getSslContextProperty();
-  if (context.getProperty(ssl_prop.getName(), ssl_value) && !ssl_value.empty()) {
-    auto ssl_data = utils::net::getSslData(context, ssl_prop, logger_);
+  if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) {
+    auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_);
     if (!ssl_data || !ssl_data->isValid()) {
       throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!");
     }
-    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, getClientAuthProperty());
+    auto client_auth = utils::parseEnumProperty<utils::net::SslServer::ClientAuthOption>(context, client_auth_property);
     server_ = std::make_unique<utils::net::SslServer>(options.max_queue_size, options.port, logger_, *ssl_data, client_auth);
   } else {
     server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, options.port, logger_);
diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index c06de77dd..1799a3fcb 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -52,22 +52,23 @@ class NetworkListenerProcessor : public core::Processor {
   }
 
  protected:
+  void startTcpServer(const core::ProcessContext& context, const core::Property& ssl_context_property, const core::Property& client_auth_property);
+  void startUdpServer(const core::ProcessContext& context);
+
+ private:
   struct ServerOptions {
     std::optional<uint64_t> max_queue_size;
     int port = 0;
   };
 
   void stopServer();
-  void startTcpServer(const core::ProcessContext& context);
-  void startUdpServer(const core::ProcessContext& context);
-  ServerOptions readServerOptions(const core::ProcessContext& context);
   void startServer(const ServerOptions& options, utils::net::IpProtocol protocol);
+  ServerOptions readServerOptions(const core::ProcessContext& context);
+
   virtual void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) = 0;
   virtual const core::Property& getMaxBatchSizeProperty() = 0;
   virtual const core::Property& getMaxQueueSizeProperty() = 0;
   virtual const core::Property& getPortProperty() = 0;
-  virtual const core::Property& getSslContextProperty() = 0;
-  virtual const core::Property& getClientAuthProperty() = 0;
 
   uint64_t max_batch_size_{500};
   std::unique_ptr<utils::net::Server> server_;
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 1de8165c0..3c98aec6e 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -249,7 +249,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164
   CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg"));
 }
 
-TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -301,7 +301,7 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") {
   check_for_only_basic_attributes(*result.at(ListenSyslog::Success)[1], SYSLOG_PORT, protocol);
 }
 
-TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -403,7 +403,7 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") {
 }
 
 
-TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
   SingleProcessorTestController controller{listen_syslog};
   LogTestController::getInstance().setTrace<ListenSyslog>();
@@ -425,7 +425,7 @@ TEST_CASE("ListenSyslog can be rescheduled", "[ListenSyslog]") {
   }
 }
 
-TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
+TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][NetworkListenerProcessor]") {
   const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog");
 
   SingleProcessorTestController controller{listen_syslog};
@@ -479,7 +479,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") {
   CHECK(controller.trigger().at(ListenSyslog::Success).empty());
 }
 
-TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") {
+TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index e8c1f9286..9009383b5 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -37,7 +37,7 @@ void check_for_attributes(core::FlowFile& flow_file) {
   CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender")));
 }
 
-TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
+TEST_CASE("ListenTCP test multiple messages", "[ListenTCP][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
@@ -66,7 +66,7 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") {
   check_for_attributes(*result.at(ListenTCP::Success)[1]);
 }
 
-TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
+TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
   SingleProcessorTestController controller{listen_tcp};
   LogTestController::getInstance().setTrace<ListenTCP>();
@@ -78,7 +78,7 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") {
   REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_tcp));
 }
 
-TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
+TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP][NetworkListenerProcessor]") {
   asio::ip::tcp::endpoint endpoint;
   SECTION("sending through IPv4", "[IPv4]") {
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT);
@@ -111,7 +111,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") {
   CHECK(controller.trigger().at(ListenTCP::Success).empty());
 }
 
-TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") {
+TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProcessor]") {
   const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
 
   SingleProcessorTestController controller{listen_tcp};
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
new file mode 100644
index 000000000..ecce9120d
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenUDP.h"
+#include "SingleProcessorTestController.h"
+#include "Utils.h"
+#include "controllers/SSLContextService.h"
+#include "range/v3/algorithm/contains.hpp"
+
+using ListenUDP = org::apache::nifi::minifi::processors::ListenUDP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10256;
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("udp.port"));
+  CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender")));
+}
+
+TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "2"));
+
+  controller.plan->scheduleProcessor(listen_udp);
+  REQUIRE(utils::sendUdpDatagram({"test_message_1"}, endpoint));
+  REQUIRE(utils::sendUdpDatagram({"another_message"}, endpoint));
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms));
+  CHECK(result.at(ListenUDP::Success).size() == 2);
+  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1");
+  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message");
+
+  check_for_attributes(*result.at(ListenUDP::Success)[0]);
+  check_for_attributes(*result.at(ListenUDP::Success)[1]);
+}
+
+TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") {
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+  SingleProcessorTestController controller{listen_udp};
+  LogTestController::getInstance().setTrace<ListenUDP>();
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "100"));
+
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
+  REQUIRE_NOTHROW(controller.plan->reset(true));
+  REQUIRE_NOTHROW(controller.plan->scheduleProcessor(listen_udp));
+}
+
+TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkListenerProcessor]") {
+  asio::ip::udp::endpoint endpoint;
+  SECTION("sending through IPv4", "[IPv4]") {
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), PORT);
+  }
+  SECTION("sending through IPv6", "[IPv6]") {
+    if (utils::isIPv6Disabled())
+      return;
+    endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), PORT);
+  }
+  const auto listen_udp = std::make_shared<ListenUDP>("ListenUDP");
+
+  SingleProcessorTestController controller{listen_udp};
+  REQUIRE(listen_udp->setProperty(ListenUDP::Port, std::to_string(PORT)));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize, "10"));
+  REQUIRE(listen_udp->setProperty(ListenUDP::MaxQueueSize, "50"));
+
+  LogTestController::getInstance().setWarn<ListenUDP>();
+
+  controller.plan->scheduleProcessor(listen_udp);
+  for (auto i = 0; i < 100; ++i) {
+    REQUIRE(utils::sendUdpDatagram({"test_message"}, endpoint));
+  }
+
+  CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms));
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).size() == 10);
+  CHECK(controller.trigger().at(ListenUDP::Success).empty());
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 6398d2bc0..f44e0586a 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -413,8 +413,8 @@ TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
   trigger_expect_failure(test_fixture, "message for non-routable server");
 
   CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
-    || LogTestController::getInstance().contains("Operation timed out", 0ms)
-    || LogTestController::getInstance().contains("host has failed to respond", 0ms)));
+      || LogTestController::getInstance().contains("Operation timed out", 0ms)
+      || LogTestController::getInstance().contains("host has failed to respond", 0ms)));
 }
 
 TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
@@ -427,8 +427,7 @@ TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
 
   trigger_expect_failure(test_fixture, "message for invalid-cert server");
 
-  CHECK((LogTestController::getInstance().contains("certificate verify failed", 0ms)
-      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));
+  CHECK(LogTestController::getInstance().matchesRegex("Handshake with .* failed", 0ms));
 }
 
 TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
@@ -441,8 +440,8 @@ TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
 
   trigger_expect_failure(test_fixture, "message for invalid-cert server");
 
-  CHECK((LogTestController::getInstance().contains("sslv3 alert handshake failure", 0ms)
-      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));}
+  CHECK(LogTestController::getInstance().matchesRegex("Handshake with .* failed", 0ms));
+}
 
 TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") {
   PutTCPTestFixture test_fixture;