You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/04/28 14:20:44 UTC

[nifi-minifi-cpp] branch main updated (fbf2a349b -> 863f5d85d)

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from fbf2a349b MINIFICPP-1810 Provide logs to be read from docker logs
     new e8736bd04 MINIFICPP-1780 Restart agent after C2 property update
     new c59807878 MINIFICPP-1796 Fix getting raw value of log properties
     new 863f5d85d MINIFICPP-1808 Improve RawSocketProtocol authorization error logging

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 extensions/coap/tests/CoapIntegrationBase.h        |   3 +-
 extensions/http-curl/protocols/RESTSender.cpp      |   2 +-
 .../http-curl/tests/C2DescribeManifestTest.cpp     |   5 +
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   3 +-
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    |  30 +-
 .../http-curl/tests/ConfigTestAccessor.h           |  16 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |  12 +-
 extensions/http-curl/tests/HTTPHandlers.h          |   4 +-
 extensions/http-curl/tests/HTTPIntegrationBase.h   |   4 +
 extensions/systemd/CMakeLists.txt                  |   2 +-
 extensions/systemd/ConsumeJournald.cpp             |   2 +-
 extensions/systemd/ConsumeJournald.h               |   4 +-
 libminifi/include/FlowController.h                 |  21 +-
 libminifi/include/agent/build_description.h        |  36 +-
 libminifi/include/c2/C2Agent.h                     |  10 +-
 libminifi/include/c2/C2Client.h                    |  14 +-
 libminifi/include/c2/C2Payload.h                   |   2 +-
 libminifi/include/core/state/Value.h               | 124 +++---
 .../include/core/state/nodes/AgentInformation.h    | 207 ++++------
 .../include/core/state/nodes/FlowInformation.h     |  12 +-
 libminifi/include/core/state/nodes/MetricsBase.h   |   2 +-
 libminifi/include/properties/Configuration.h       |   1 +
 libminifi/include/properties/Configure.h           |   1 +
 .../include/utils/FifoExecutor.h                   |  13 +-
 libminifi/include/utils/SmallString.h              |   4 +
 libminifi/include/utils/file/FileUtils.h           |   6 +-
 .../include/utils/meta/type_list.h                 |  22 +-
 libminifi/src/Configuration.cpp                    |   1 +
 libminifi/src/Configure.cpp                        |  29 +-
 libminifi/src/FlowController.cpp                   |  27 +-
 libminifi/src/c2/C2Agent.cpp                       |  48 +--
 libminifi/src/c2/C2Client.cpp                      |  32 +-
 libminifi/src/core/state/Value.cpp                 |  48 ++-
 libminifi/src/sitetosite/RawSocketProtocol.cpp     |  26 +-
 .../src/utils/FifoExecutor.cpp                     |  15 +-
 libminifi/src/utils/file/FileUtils.cpp             |   7 +
 libminifi/test/aws-tests/FetchS3ObjectTests.cpp    |   8 +-
 libminifi/test/flow-tests/TestControllerWithFlow.h |   3 +-
 libminifi/test/integration/IntegrationBase.h       | 116 ++++--
 .../test/integration/ProvenanceReportingTest.cpp   |  18 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   4 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |   3 +-
 main/AgentDocs.cpp                                 |  40 +-
 main/AgentDocs.h                                   |  19 +-
 main/MiNiFiMain.cpp                                | 449 +++++++++++----------
 nanofi/src/cxx/C2CallbackAgent.cpp                 |  14 +-
 47 files changed, 738 insertions(+), 733 deletions(-)
 copy main/MiNiFiWindowsService.h => extensions/http-curl/tests/ConfigTestAccessor.h (73%)
 rename extensions/systemd/WorkerThread.h => libminifi/include/utils/FifoExecutor.h (81%)
 copy extensions/systemd/Common.h => libminifi/include/utils/meta/type_list.h (72%)
 rename extensions/systemd/WorkerThread.cpp => libminifi/src/utils/FifoExecutor.cpp (77%)


[nifi-minifi-cpp] 01/03: MINIFICPP-1780 Restart agent after C2 property update

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e8736bd04414090890f948dc02de406a40b12103
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 29 16:59:20 2022 +0200

    MINIFICPP-1780 Restart agent after C2 property update
    
    Co-authored-by: Marton Szasz <sz...@apache.org>
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1299
---
 extensions/coap/tests/CoapIntegrationBase.h        |   3 +-
 extensions/http-curl/protocols/RESTSender.cpp      |   2 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   3 +-
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    |  23 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |  12 +-
 extensions/http-curl/tests/HTTPHandlers.h          |   2 +-
 extensions/http-curl/tests/HTTPIntegrationBase.h   |   4 +
 extensions/systemd/CMakeLists.txt                  |   2 +-
 extensions/systemd/ConsumeJournald.cpp             |   2 +-
 extensions/systemd/ConsumeJournald.h               |   4 +-
 libminifi/include/FlowController.h                 |  21 +-
 libminifi/include/agent/build_description.h        |  36 +-
 libminifi/include/c2/C2Agent.h                     |  10 +-
 libminifi/include/c2/C2Client.h                    |  14 +-
 libminifi/include/c2/C2Payload.h                   |   2 +-
 libminifi/include/core/state/Value.h               | 124 +++---
 .../include/core/state/nodes/AgentInformation.h    | 207 ++++------
 .../include/core/state/nodes/FlowInformation.h     |  12 +-
 libminifi/include/core/state/nodes/MetricsBase.h   |   2 +-
 libminifi/include/properties/Configuration.h       |   1 +
 .../include/utils/FifoExecutor.h                   |  13 +-
 libminifi/include/utils/SmallString.h              |   4 +
 libminifi/include/utils/file/FileUtils.h           |   6 +-
 .../include/utils/meta/type_list.h                 |  36 +-
 libminifi/src/Configuration.cpp                    |   1 +
 libminifi/src/FlowController.cpp                   |  25 +-
 libminifi/src/c2/C2Agent.cpp                       |  48 +--
 libminifi/src/c2/C2Client.cpp                      |  30 +-
 libminifi/src/core/state/Value.cpp                 |  48 ++-
 .../src/utils/FifoExecutor.cpp                     |  15 +-
 libminifi/src/utils/file/FileUtils.cpp             |   7 +
 libminifi/test/aws-tests/FetchS3ObjectTests.cpp    |   8 +-
 libminifi/test/flow-tests/TestControllerWithFlow.h |   3 +-
 libminifi/test/integration/IntegrationBase.h       | 116 ++++--
 .../test/integration/ProvenanceReportingTest.cpp   |  18 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   4 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |   3 +-
 main/AgentDocs.cpp                                 |  40 +-
 main/AgentDocs.h                                   |  19 +-
 main/MiNiFiMain.cpp                                | 449 +++++++++++----------
 nanofi/src/cxx/C2CallbackAgent.cpp                 |  14 +-
 42 files changed, 689 insertions(+), 706 deletions(-)

diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 74f7c344f..520aaaf76 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -74,7 +74,8 @@ class CoapIntegrationBase : public IntegrationBase {
 
     std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-    std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
+    std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
     controller->load();
     controller->start();
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 64c7408ea..af15b4dbf 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -141,7 +141,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
   }
 
   if (payload.getOperation() == Operation::TRANSFER) {
-    file_callback = std::unique_ptr<utils::ByteOutputCallback>(new utils::ByteOutputCallback(std::numeric_limits<size_t>::max()));
+    file_callback = std::make_unique<utils::ByteOutputCallback>(std::numeric_limits<size_t>::max());
     read.pos = 0;
     read.ptr = file_callback.get();
     client.setReadCallback(&read);
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index d4cbbf83c..bfb69fd18 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -131,7 +131,8 @@ int main(int argc, char **argv) {
     test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
 
   std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
-      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
+      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
 
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 6d96de6ba..d92497c0b 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -80,7 +80,12 @@ class C2HeartbeatHandler : public ServerAwareHandler {
 
 class VerifyPropertyUpdate : public HTTPIntegrationBase {
  public:
+  VerifyPropertyUpdate() :fn_{[]{}} {}
   explicit VerifyPropertyUpdate(std::function<void()> fn) : fn_(std::move(fn)) {}
+  VerifyPropertyUpdate(const VerifyPropertyUpdate&) = delete;
+  VerifyPropertyUpdate(VerifyPropertyUpdate&&) = default;
+  VerifyPropertyUpdate& operator=(const VerifyPropertyUpdate&) = delete;
+  VerifyPropertyUpdate& operator=(VerifyPropertyUpdate&&) = default;
 
   void testSetup() {}
 
@@ -89,6 +94,8 @@ class VerifyPropertyUpdate : public HTTPIntegrationBase {
   }
 
   std::function<void()> fn_;
+
+  [[nodiscard]] int getRestartRequestedCount() const noexcept { return restart_requested_count_; }
 };
 
 static const std::string properties_file =
@@ -96,7 +103,7 @@ static const std::string properties_file =
     "nifi.c2.agent.protocol.class=RESTSender\n"
     "nifi.c2.enable=true\n"
     "nifi.c2.agent.class=test\n"
-    "nifi.c2.agent.heartbeat.period=100\n";
+    "nifi.c2.agent.heartbeat.period=500\n";
 
 static const std::string log_properties_file =
     "logger.root=INFO,ostream\n";
@@ -151,10 +158,18 @@ int main() {
     assert(!log_test_controller->contains("DummyClass3::before", 0s));
   }
 
-  VerifyPropertyUpdate harness([&] {
+  // On msvc, the passed lambda can't capture a reference to the object under construction, so we need to late-init harness.
+  VerifyPropertyUpdate harness;
+  harness = VerifyPropertyUpdate([&] {
     assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("FULLY_APPLIED") == 1;}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("NO_OPERATION") > 0;}));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+      return ack_handler.getApplyCount("FULLY_APPLIED") == 1
+          && harness.getRestartRequestedCount() == 1;
+    }));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+      return ack_handler.getApplyCount("NO_OPERATION") > 0
+          && harness.getRestartRequestedCount() == 1;  // only one, i.e. no additional restart requests compared to the previous update.
+    }));
     // update operation acknowledged
     {
       // verify final log levels
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 839769788..72669b64b 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -69,13 +69,15 @@ int main(int argc, char **argv) {
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file));
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
+      test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
-                                                                                                content_repo,
-                                                                                                DEFAULT_ROOT_GROUP_NAME);
+  const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+      content_repo,
+      DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(),
+      []{});
 
   disabled = false;
   std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 92478a00a..7bae42aed 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -476,7 +476,7 @@ class HeartbeatHandler : public ServerAwareHandler {
           classes.push_back(proc["type"].GetString());
         }
 
-        auto group = minifi::BuildDescription::getClassDescriptions(str);
+        auto group = minifi::BuildDescription{}.getClassDescriptions(str);
         for (const auto& proc : group.processors_) {
           assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
           (void)proc;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 1136f217b..6870a14fb 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -47,6 +47,10 @@ class HTTPIntegrationBase : public IntegrationBase {
       : IntegrationBase(waitTime),
         server(nullptr) {
   }
+  HTTPIntegrationBase(const HTTPIntegrationBase&) = delete;
+  HTTPIntegrationBase(HTTPIntegrationBase&&) = default;
+  HTTPIntegrationBase& operator=(const HTTPIntegrationBase&) = delete;
+  HTTPIntegrationBase& operator=(HTTPIntegrationBase&&) = default;
 
   virtual void setUrl(const std::string &url, ServerAwareHandler *handler);
 
diff --git a/extensions/systemd/CMakeLists.txt b/extensions/systemd/CMakeLists.txt
index 1cb668ba5..41bd52bf1 100644
--- a/extensions/systemd/CMakeLists.txt
+++ b/extensions/systemd/CMakeLists.txt
@@ -19,7 +19,7 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-add_library(minifi-systemd SHARED ConsumeJournald.cpp WorkerThread.cpp libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
+add_library(minifi-systemd SHARED ConsumeJournald.cpp libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
 
 target_link_libraries(minifi-systemd ${LIBMINIFI} Threads::Threads date::date)
 
diff --git a/extensions/systemd/ConsumeJournald.cpp b/extensions/systemd/ConsumeJournald.cpp
index 94f3ca8b1..98e800ae2 100644
--- a/extensions/systemd/ConsumeJournald.cpp
+++ b/extensions/systemd/ConsumeJournald.cpp
@@ -81,7 +81,7 @@ void ConsumeJournald::initialize() {
   setSupportedProperties({BatchSize, PayloadFormat, IncludeTimestamp, JournalType, ProcessOldMessages, TimestampFormat});
   setSupportedRelationships({Success});
 
-  worker_ = std::make_unique<Worker>();
+  worker_ = std::make_unique<utils::FifoExecutor>();
 }
 
 void ConsumeJournald::notifyStop() {
diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h
index a96c07e03..830e657e2 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -35,7 +35,7 @@
 #include "libwrapper/LibWrapper.h"
 #include "utils/Deleters.h"
 #include "utils/gsl.h"
-#include "WorkerThread.h"
+#include "utils/FifoExecutor.h"
 
 namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd {
 
@@ -97,7 +97,7 @@ class ConsumeJournald final : public core::Processor {
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger();
   core::CoreComponentStateManager* state_manager_;
   std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
-  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<utils::FifoExecutor> worker_;
   std::unique_ptr<libwrapper::Journal> journal_;
 
   std::size_t batch_size_ = 1000;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 0d5f6d7cd..92fa9aeb6 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -22,6 +22,7 @@
 
 #include <algorithm>
 #include <atomic>
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -57,10 +58,7 @@
 #include "utils/Id.h"
 #include "utils/file/FileSystem.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 namespace state {
 class ProcessorController;
@@ -77,12 +75,14 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
  public:
   FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, std::string name = DEFAULT_ROOT_GROUP_NAME,
-                 std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>());
+                 std::shared_ptr<core::ContentRepository> content_repo, const std::string& name = DEFAULT_ROOT_GROUP_NAME,
+                 std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(),
+                 std::function<void()> request_restart = []{});
 
   FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem);
+                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart = []{});
 
   ~FlowController() override;
 
@@ -181,7 +181,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
    * @return the agent manifest response node
    */
-  std::shared_ptr<state::response::ResponseNode> getAgentManifest() const override;
+  std::shared_ptr<state::response::ResponseNode> getAgentManifest() override;
 
   uint64_t getUptime() override;
 
@@ -254,9 +254,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   std::map<utils::Identifier, std::unique_ptr<state::ProcessorController>> processor_to_controller_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
 
 #endif  // LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
diff --git a/libminifi/include/agent/build_description.h b/libminifi/include/agent/build_description.h
index 7eec05062..6017fd71f 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -32,10 +32,7 @@
 #include "core/Annotation.h"
 #include "io/validation.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 class ClassDescription {
  public:
@@ -71,6 +68,10 @@ struct Components {
   std::vector<ClassDescription> processors_;
   std::vector<ClassDescription> controller_services_;
   std::vector<ClassDescription> other_components_;
+
+  [[nodiscard]] bool empty() const noexcept {
+    return processors_.empty() && controller_services_.empty() && other_components_.empty();
+  }
 };
 
 struct BundleDetails {
@@ -121,13 +122,8 @@ class ExternalBuildDescription {
 
 class BuildDescription {
  public:
-  static struct Components getClassDescriptions(const std::string& group = "minifi-system") {
-    static std::map<std::string, struct Components> class_mappings;
-#ifndef WIN32
-    if (UNLIKELY(IsNullOrEmpty(class_mappings[group].processors_) && IsNullOrEmpty(class_mappings[group].processors_))) {
-#else
-      if (class_mappings[group].processors_.empty()) {
-#endif
+  struct Components getClassDescriptions(const std::string& group = "minifi-system") {
+    if (class_mappings_[group].empty()) {
       for (const auto& clazz : core::ClassLoader::getDefaultClassLoader().getClasses(group)) {
         std::string class_name = clazz;
         auto lastOfIdx = clazz.find_last_of("::");
@@ -158,22 +154,22 @@ class BuildDescription {
             description.inputRequirement_ = processor->getInputRequirementAsString();
             description.isSingleThreaded_ = processor->isSingleThreaded();
             description.class_relationships_ = processor->getSupportedRelationships();
-            class_mappings[group].processors_.emplace_back(description);
+            class_mappings_[group].processors_.emplace_back(description);
           } else if (is_controller_service) {
-            class_mappings[group].controller_services_.emplace_back(description);
+            class_mappings_[group].controller_services_.emplace_back(description);
           } else {
-            class_mappings[group].other_components_.emplace_back(description);
+            class_mappings_[group].other_components_.emplace_back(description);
           }
         }
       }
     }
-    return class_mappings[group];
+    return class_mappings_[group];
   }
-}; // NOLINT
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+ private:
+  std::map<std::string, struct Components> class_mappings_;
+};
+
+}  // namespace org::apache::nifi::minifi
 
 #endif  // LIBMINIFI_INCLUDE_AGENT_BUILD_DESCRIPTION_H_
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 38b33c03a..b32b029f6 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -68,8 +68,9 @@ class C2Agent : public state::UpdateController {
   C2Agent(core::controller::ControllerServiceProvider *controller,
           state::Pausable *pause_handler,
           state::StateMonitor* updateSink,
-          const std::shared_ptr<Configure> &configure,
-          const std::shared_ptr<utils::file::FileSystem> &filesystem = std::make_shared<utils::file::FileSystem>());
+          std::shared_ptr<Configure> configure,
+          std::shared_ptr<utils::file::FileSystem> filesystem,
+          std::function<void()> request_restart);
 
   ~C2Agent() noexcept override {
     delete protocol_.load();
@@ -93,8 +94,6 @@ class C2Agent : public state::UpdateController {
   std::optional<std::string> fetchFlow(const std::string& uri) const;
 
  protected:
-  void restart_agent();
-
   /**
    * Check the collection of triggers for any updates that need to be handled.
    * This is an optional step
@@ -246,6 +245,9 @@ class C2Agent : public state::UpdateController {
   bool manifest_sent_;
 
   const uint64_t C2RESPONSE_POLL_MS = 100;
+
+  std::atomic<bool> restart_needed_ = false;
+  std::function<void()> request_restart_;
 };
 
 }  // namespace c2
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 14d4dcdd1..7180b281e 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -36,11 +36,7 @@
 #include "core/Flow.h"
 #include "utils/file/FileSystem.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 class C2Client : public core::Flow, public state::response::NodeReporter {
  public:
@@ -48,6 +44,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
       std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
       std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo,
       std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<utils::file::FileSystem> filesystem,
+      std::function<void()> request_restart,
       std::shared_ptr<core::logging::Logger> logger = core::logging::LoggerFactory<C2Client>::getLogger());
 
   void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink);
@@ -84,10 +81,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
 
  protected:
   std::atomic<bool> flow_update_{false};
+  std::function<void()> request_restart_;
 };
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index dfb21d457..badc5e0eb 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -230,7 +230,7 @@ class C2Payload : public state::Update {
 
   friend std::ostream& operator<<(std::ostream& out, const C2Payload& payload);
 
-  std::string str() const {
+  [[nodiscard]] std::string str() const {
     std::stringstream ss;
     ss << *this;
     return std::move(ss).str();
diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h
index 230eb6f65..6d8b77219 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -24,18 +24,15 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 #include <typeinfo>
 #include "utils/ValueParser.h"
 #include "utils/ValueCaster.h"
 #include "utils/Export.h"
+#include "utils/meta/type_list.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 /**
  * Purpose: Represents an AST value
@@ -48,17 +45,18 @@ class Value {
   using ParseException = utils::internal::ParseException;
 
  public:
-  explicit Value(const std::string &value)
-      : string_value(value),
+  explicit Value(std::string value)
+      : string_value(std::move(value)),
         type_id(std::type_index(typeid(std::string))) {
   }
 
   virtual ~Value() = default;
-  std::string getStringValue() const {
+
+  [[nodiscard]] std::string getStringValue() const {
     return string_value;
   }
 
-  const char* c_str() const {
+  [[nodiscard]] const char* c_str() const {
     return string_value.c_str();
   }
 
@@ -67,7 +65,7 @@ class Value {
     return convertValueImpl<typename std::common_type<T>::type>(ref);
   }
 
-  bool empty() {
+  [[nodiscard]] bool empty() const noexcept {
     return string_value.empty();
   }
 
@@ -178,7 +176,7 @@ class UInt32Value : public Value {
     setTypeId<uint32_t>();
   }
 
-  uint32_t getValue() const {
+  [[nodiscard]] uint32_t getValue() const {
     return value;
   }
 
@@ -210,7 +208,7 @@ class UInt32Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  uint32_t value;
+  uint32_t value{};
 };
 
 class IntValue : public Value {
@@ -225,7 +223,7 @@ class IntValue : public Value {
       : Value(strvalue) {
     utils::internal::ValueParser(strvalue).parse(value).parseEnd();
   }
-  int getValue() const {
+  [[nodiscard]] int getValue() const {
     return value;
   }
 
@@ -256,7 +254,7 @@ class IntValue : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  int value;
+  int value{};
 };
 
 class BoolValue : public Value {
@@ -272,7 +270,7 @@ class BoolValue : public Value {
     utils::internal::ValueParser(strvalue).parse(value).parseEnd();
   }
 
-  bool getValue() const {
+  [[nodiscard]] bool getValue() const {
     return value;
   }
 
@@ -302,7 +300,7 @@ class BoolValue : public Value {
     return true;
   }
 
-  bool value;
+  bool value{};
 
  private:
   template<typename T>
@@ -329,7 +327,7 @@ class UInt64Value : public Value {
     setTypeId<uint64_t>();
   }
 
-  uint64_t getValue() const {
+  [[nodiscard]] uint64_t getValue() const {
     return value;
   }
 
@@ -358,7 +356,7 @@ class UInt64Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  uint64_t value;
+  uint64_t value{};
 };
 
 class Int64Value : public Value {
@@ -374,7 +372,7 @@ class Int64Value : public Value {
     setTypeId<int64_t>();
   }
 
-  int64_t getValue() {
+  [[nodiscard]] int64_t getValue() const {
     return value;
   }
 
@@ -404,7 +402,7 @@ class Int64Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  int64_t value;
+  int64_t value{};
 };
 
 class DoubleValue : public Value {
@@ -420,37 +418,37 @@ class DoubleValue : public Value {
     setTypeId<double>();
   }
 
-  double getValue() {
+  [[nodiscard]] double getValue() const {
     return value;
   }
 
  protected:
-  virtual bool getValue(int& ref) {
+  bool getValue(int& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(uint32_t& ref) {
+  bool getValue(uint32_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(int64_t& ref ) {
+  bool getValue(int64_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(uint64_t& ref) {
+  bool getValue(uint64_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(bool&) {
+  bool getValue(bool&) override {
     return false;
   }
 
-  virtual bool getValue(double& ref) {
+  bool getValue(double& ref) override {
     ref = value;
     return true;
   }
 
-  double value;
+  double value{};
 };
 
 static inline std::shared_ptr<Value> createValue(const bool &object) {
@@ -497,56 +495,50 @@ static inline std::shared_ptr<Value> createValue(const double &object) {
  * Purpose: ValueNode is the AST container for a value
  */
 class ValueNode {
+  using supported_types = utils::meta::type_list<int, uint32_t, size_t, int64_t, uint64_t, bool, char*, const char*, double, std::string>;
+
  public:
-  ValueNode()
-      : value_(nullptr) {
-  }
+  ValueNode() = default;
 
-  ValueNode(ValueNode &&vn) = default;
-  ValueNode(const ValueNode &vn) = default;
+  template<typename T>
+  requires (supported_types::contains<T>())  // NOLINT
+  /* implicit, because it doesn't change the meaning, and it simplifies construction of maps */
+  ValueNode(const T value)  // NOLINT
+      :value_{createValue(value)}
+  {}
 
   /**
    * Define the representations and eventual storage relationships through
    * createValue
    */
   template<typename T>
-  auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int >::value ||
-  std::is_same<T, uint32_t >::value ||
-  std::is_same<T, size_t >::value ||
-  std::is_same<T, int64_t>::value ||
-  std::is_same<T, uint64_t >::value ||
-  std::is_same<T, bool >::value ||
-  std::is_same<T, char* >::value ||
-  std::is_same<T, const char* >::value ||
-  std::is_same<T, double>::value ||
-  std::is_same<T, std::string>::value, ValueNode&>::type {
+  requires (supported_types::contains<T>())  // NOLINT
+  ValueNode& operator=(const T ref) {
     value_ = createValue(ref);
     return *this;
   }
 
-  ValueNode &operator=(const ValueNode &ref) = default;
-
   inline bool operator==(const ValueNode &rhs) const {
     return to_string() == rhs.to_string();
   }
 
-  inline bool operator==(const char*rhs) const {
+  inline bool operator==(const char* rhs) const {
     return to_string() == rhs;
   }
 
-  friend bool operator==(const char *lhs, const ValueNode& rhs) {
+  friend bool operator==(const char* lhs, const ValueNode& rhs) {
     return lhs == rhs.to_string();
   }
 
-  std::string to_string() const {
+  [[nodiscard]] std::string to_string() const {
     return value_ ? value_->getStringValue() : "";
   }
 
-  std::shared_ptr<Value> getValue() const {
+  [[nodiscard]] std::shared_ptr<Value> getValue() const {
     return value_;
   }
 
-  bool empty() const {
+  [[nodiscard]] bool empty() const noexcept {
     return value_ == nullptr || value_->empty();
   }
 
@@ -556,33 +548,23 @@ class ValueNode {
 
 struct SerializedResponseNode {
   std::string name;
-  ValueNode value;
-  bool array;
-  bool collapsible;
+  ValueNode value{};
+  bool array = false;
+  bool collapsible = true;
   bool keep_empty = false;
-  std::vector<SerializedResponseNode> children;
-
-  SerializedResponseNode(bool collapsible = true) // NOLINT
-      : array(false),
-        collapsible(collapsible) {
-  }
+  std::vector<SerializedResponseNode> children{};
 
-  SerializedResponseNode(const SerializedResponseNode &other) = default;
-
-  SerializedResponseNode &operator=(const SerializedResponseNode &other) = default;
-
-  bool empty() const {
+  [[nodiscard]] bool empty() const noexcept {
     return value.empty() && children.empty();
   }
+
+  [[nodiscard]] std::string to_string() const;
 };
 
+inline std::string to_string(const SerializedResponseNode& node) { return node.to_string(); }
+
 std::string hashResponseNodes(const std::vector<SerializedResponseNode>& nodes);
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::state::response
 
 #endif  // LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 904ada129..8520c1e40 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -64,12 +64,7 @@
 #include "utils/Export.h"
 #include "SupportedOperations.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 #define GROUP_STR "org.apache.nifi.minifi"
 
@@ -91,7 +86,7 @@ class ComponentManifest : public DeviceInformation {
     std::vector<SerializedResponseNode> serialized;
     SerializedResponseNode resp;
     resp.name = "componentManifest";
-    struct Components group = BuildDescription::getClassDescriptions(getName());
+    struct Components group = build_description_.getClassDescriptions(getName());
     serializeClassDescription(group.processors_, "processors", resp);
     serializeClassDescription(group.controller_services_, "controllerServices", resp);
     serialized.push_back(resp);
@@ -316,6 +311,9 @@ class ComponentManifest : public DeviceInformation {
       response.children.push_back(type);
     }
   }
+
+ private:
+  BuildDescription build_description_;
 };
 
 class ExternalManifest : public ComponentManifest {
@@ -499,11 +497,13 @@ class AgentStatus : public StateMonitorNode {
   }
 
   SerializedResponseNode serializeComponents() const {
-    SerializedResponseNode components_node(false);
+    SerializedResponseNode components_node;
+    components_node.collapsible = false;
     components_node.name = "components";
     if (monitor_ != nullptr) {
       monitor_->executeOnAllComponents([&components_node](StateController& component){
-        SerializedResponseNode component_node(false);
+        SerializedResponseNode component_node;
+        component_node.collapsible = false;
         component_node.name = component.getComponentName();
 
         SerializedResponseNode uuid_node;
@@ -585,7 +585,7 @@ class AgentMonitor {
     }
   }
 
-  void setStateMonitor(state::StateMonitor* &monitor) {
+  void setStateMonitor(state::StateMonitor* monitor) {
     monitor_ = monitor;
   }
 
@@ -599,15 +599,15 @@ class AgentMonitor {
  */
 class AgentManifest : public DeviceInformation {
  public:
-  AgentManifest(std::string name, const utils::Identifier& uuid)
-    : DeviceInformation(std::move(name), uuid) {
+  AgentManifest(const std::string& name, const utils::Identifier& uuid)
+    : DeviceInformation(name, uuid) {
   }
 
-  explicit AgentManifest(std::string name)
-    : DeviceInformation(std::move(name)) {
+  explicit AgentManifest(const std::string& name)
+    : DeviceInformation(name) {
   }
 
-  std::string getName() const {
+  std::string getName() const override {
     return "agentManifest";
   }
 
@@ -620,80 +620,38 @@ class AgentManifest : public DeviceInformation {
   }
 
   void setConfigurationReader(std::function<std::optional<std::string>(const std::string&)> configuration_reader) {
-    configuration_reader_ = configuration_reader;
-  }
-
-  std::vector<SerializedResponseNode> serialize() {
-    static std::vector<SerializedResponseNode> serialized;
-    if (serialized.empty()) {
-      SerializedResponseNode ident;
-
-      ident.name = "identifier";
-      ident.value = AgentBuild::BUILD_IDENTIFIER;
-
-      SerializedResponseNode type;
-
-      type.name = "agentType";
-      type.value = "cpp";
-
-      SerializedResponseNode version;
-
-      version.name = "version";
-      version.value = AgentBuild::VERSION;
-
-      SerializedResponseNode buildInfo;
-      buildInfo.name = "buildInfo";
-
-      SerializedResponseNode build_version;
-      build_version.name = "version";
-      build_version.value = AgentBuild::VERSION;
-
-      SerializedResponseNode build_rev;
-      build_rev.name = "revision";
-      build_rev.value = AgentBuild::BUILD_REV;
-
-      SerializedResponseNode build_date;
-      build_date.name = "timestamp";
-      build_date.value = (uint64_t) std::stoull(AgentBuild::BUILD_DATE);
-
-      SerializedResponseNode compiler_command;
-      compiler_command.name = "compiler";
-      compiler_command.value = AgentBuild::COMPILER;
-
-      SerializedResponseNode compiler_flags;
-      compiler_flags.name = "flags";
-      compiler_flags.value = AgentBuild::COMPILER_FLAGS;
-
-      buildInfo.children.push_back(compiler_flags);
-      buildInfo.children.push_back(compiler_command);
-
-      buildInfo.children.push_back(build_version);
-      buildInfo.children.push_back(build_rev);
-      buildInfo.children.push_back(build_date);
-
-      Bundles bundles("bundles");
-
-      serialized.push_back(ident);
-      serialized.push_back(type);
-      serialized.push_back(buildInfo);
-      // serialize the bundle information.
-      for (auto bundle : bundles.serialize()) {
-        serialized.push_back(bundle);
-      }
-
-      SchedulingDefaults defaults("schedulingDefaults");
-
-      for (auto defaultNode : defaults.serialize()) {
-        serialized.push_back(defaultNode);
-      }
-
-      SupportedOperations supported_operations("supportedOperations");
-      supported_operations.setStateMonitor(monitor_);
-      supported_operations.setUpdatePolicyController(update_policy_controller_);
-      supported_operations.setConfigurationReader(configuration_reader_);
-      for (const auto& operation : supported_operations.serialize()) {
-        serialized.push_back(operation);
-      }
+    configuration_reader_ = std::move(configuration_reader);
+  }
+
+  std::vector<SerializedResponseNode> serialize() override {
+    std::vector<SerializedResponseNode> serialized = {
+        {.name = "identifier", .value = AgentBuild::BUILD_IDENTIFIER},
+        {.name = "agentType", .value = "cpp"},
+        {.name = "buildInfo", .children = {
+            {.name = "flags", .value = AgentBuild::COMPILER_FLAGS},
+            {.name = "compiler", .value = AgentBuild::COMPILER},
+            {.name = "version", .value = AgentBuild::VERSION},
+            {.name = "revision", .value = AgentBuild::BUILD_REV},
+            {.name = "timestamp", .value = static_cast<uint64_t>(std::stoull(AgentBuild::BUILD_DATE))}
+        }}
+    };
+    {
+      auto bundles = Bundles{"bundles"}.serialize();
+      std::move(std::begin(bundles), std::end(bundles), std::back_inserter(serialized));
+    }
+    {
+      auto schedulingDefaults = SchedulingDefaults{"schedulingDefaults"}.serialize();
+      std::move(std::begin(schedulingDefaults), std::end(schedulingDefaults), std::back_inserter(serialized));
+    }
+    {
+      auto supportedOperations = [this]() {
+        SupportedOperations supported_operations("supportedOperations");
+        supported_operations.setStateMonitor(monitor_);
+        supported_operations.setUpdatePolicyController(update_policy_controller_);
+        supported_operations.setConfigurationReader(configuration_reader_);
+        return supported_operations.serialize();
+      }();
+      std::move(std::begin(supportedOperations), std::end(supportedOperations), std::back_inserter(serialized));
     }
     return serialized;
   }
@@ -721,52 +679,42 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde
   }
 
   void setConfigurationReader(std::function<std::optional<std::string>(const std::string&)> configuration_reader) {
-    configuration_reader_ = configuration_reader;
+    configuration_reader_ = std::move(configuration_reader);
   }
 
  protected:
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized;
-
-    SerializedResponseNode ident;
-
-    ident.name = "identifier";
-    ident.value = provider_->getAgentIdentifier();
-    serialized.push_back(ident);
+  std::vector<SerializedResponseNode> serialize() override {
+    std::vector<SerializedResponseNode> serialized = {
+        {.name = "identifier", .value = provider_->getAgentIdentifier()},
+    };
 
     const auto agent_class = provider_->getAgentClass();
     if (agent_class) {
-      SerializedResponseNode agentClass;
-      agentClass.name = "agentClass";
-      agentClass.value = *agent_class;
-      serialized.push_back(agentClass);
+      serialized.push_back({.name = "agentClass", .value = *agent_class});
     }
 
-    SerializedResponseNode agentManifestHash;
-    agentManifestHash.name = "agentManifestHash";
-    agentManifestHash.value = getAgentManifestHash();
-    serialized.push_back(agentManifestHash);
-
+    serialized.push_back({.name = "agentManifestHash", .value = getAgentManifestHash()});
     return serialized;
   }
 
   std::vector<SerializedResponseNode> getAgentManifest() const {
-    SerializedResponseNode agentManifest;
-    agentManifest.name = "agentManifest";
-    AgentManifest manifest{"manifest"};
-    manifest.setStateMonitor(monitor_);
-    manifest.setUpdatePolicyController(update_policy_controller_);
-    manifest.setConfigurationReader(configuration_reader_);
-    agentManifest.children = manifest.serialize();
-    return std::vector<SerializedResponseNode>{ agentManifest };
-  }
-
-  std::string getAgentManifestHash() {
-    if (!agentManifestHash_.has_value()) {
-      agentManifestHash_ = hashResponseNodes(getAgentManifest());
+    if (agent_manifest_cache_) { return std::vector{*agent_manifest_cache_}; }
+    agent_manifest_cache_ = {.name = "agentManifest", .children = [this] {
+      AgentManifest manifest{"manifest"};
+      manifest.setStateMonitor(monitor_);
+      manifest.setUpdatePolicyController(update_policy_controller_);
+      manifest.setConfigurationReader(configuration_reader_);
+      return manifest.serialize();
+    }()};
+    agent_manifest_hash_cache_.clear();
+    return std::vector{ *agent_manifest_cache_ };
+  }
+
+  std::string getAgentManifestHash() const {
+    if (agent_manifest_hash_cache_.empty()) {
+      agent_manifest_hash_cache_ = hashResponseNodes(getAgentManifest());
     }
-
-    return *agentManifestHash_;
+    return agent_manifest_hash_cache_;
   }
 
   std::vector<SerializedResponseNode> getAgentStatus() const {
@@ -787,9 +735,11 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde
   }
 
  private:
-  std::optional<std::string> agentManifestHash_;
+  mutable std::optional<SerializedResponseNode> agent_manifest_cache_;
+  mutable std::string agent_manifest_hash_cache_;
   controllers::UpdatePolicyControllerService* update_policy_controller_ = nullptr;
   std::function<std::optional<std::string>(const std::string&)> configuration_reader_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AgentNode>::getLogger();
 };
 
 /**
@@ -811,7 +761,7 @@ class AgentInformation : public AgentNode {
     setArray(false);
   }
 
-  std::string getName() const {
+  std::string getName() const override {
     return "agentInfo";
   }
 
@@ -819,7 +769,7 @@ class AgentInformation : public AgentNode {
     include_agent_status_ = include;
   }
 
-  std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
     if (include_agent_manifest_) {
       auto manifest = getAgentManifest();
@@ -837,11 +787,6 @@ class AgentInformation : public AgentNode {
   bool include_agent_status_;
 };
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::state::response
 
 #endif  // LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index b10e504fb..aae2e355c 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -194,11 +194,13 @@ class FlowInformation : public FlowMonitor {
     serialized.push_back(uri);
 
     if (!connections_.empty()) {
-      SerializedResponseNode queues(false);
+      SerializedResponseNode queues;
+      queues.collapsible = false;
       queues.name = "queues";
 
       for (auto &queue : connections_) {
-        SerializedResponseNode repoNode(false);
+        SerializedResponseNode repoNode;
+        repoNode.collapsible = false;
         repoNode.name = queue.second->getName();
 
         SerializedResponseNode queueUUIDNode;
@@ -233,11 +235,13 @@ class FlowInformation : public FlowMonitor {
     }
 
     if (nullptr != monitor_) {
-      SerializedResponseNode componentsNode(false);
+      SerializedResponseNode componentsNode;
+      componentsNode.collapsible = false;
       componentsNode.name = "components";
 
       monitor_->executeOnAllComponents([&componentsNode](StateController& component){
-        SerializedResponseNode componentNode(false);
+        SerializedResponseNode componentNode;
+        componentNode.collapsible = false;
         componentNode.name = component.getComponentName();
 
         SerializedResponseNode uuidNode;
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index e0af1043a..d16c2bcc4 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -207,7 +207,7 @@ class NodeReporter {
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
    * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const = 0;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() = 0;
 };
 
 /**
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index d0b7343ae..f7db69cd4 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -107,6 +107,7 @@ class Configuration : public Properties {
   static constexpr const char *nifi_c2_agent_coap_port = "nifi.c2.agent.coap.port";
   static constexpr const char *nifi_c2_agent_protocol_class = "nifi.c2.agent.protocol.class";
   static constexpr const char *nifi_c2_agent_identifier = "nifi.c2.agent.identifier";
+  static constexpr const char *nifi_c2_agent_identifier_fallback = "nifi.c2.agent.identifier.fallback";
   static constexpr const char *nifi_c2_agent_trigger_classes = "nifi.c2.agent.trigger.classes";
   static constexpr const char *nifi_c2_root_classes = "nifi.c2.root.classes";
   static constexpr const char *nifi_c2_root_class_definitions = "nifi.c2.root.class.definitions";
diff --git a/extensions/systemd/WorkerThread.h b/libminifi/include/utils/FifoExecutor.h
similarity index 81%
rename from extensions/systemd/WorkerThread.h
rename to libminifi/include/utils/FifoExecutor.h
index 503fcb1b4..1aca59d56 100644
--- a/extensions/systemd/WorkerThread.h
+++ b/libminifi/include/utils/FifoExecutor.h
@@ -23,7 +23,7 @@
 
 #include "utils/MinifiConcurrentQueue.h"
 
-namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd {
+namespace org::apache::nifi::minifi::utils {
 
 namespace detail {
 class WorkerThread final {
@@ -48,9 +48,9 @@ class WorkerThread final {
 }  // namespace detail
 
 /**
- * A worker that executes arbitrary functions with no parameters asynchronously on an internal thread, returning a future to the result.
+ * Executes arbitrary functions with no parameters asynchronously on an internal thread, returning a future to the result.
  */
-class Worker final {
+class FifoExecutor final {
  public:
   template<typename Func>
   auto enqueue(Func func) -> std::future<decltype(func())> {
@@ -64,9 +64,4 @@ class Worker final {
   detail::WorkerThread worker_thread_;
 };
 
-}  // namespace systemd
-}  // namespace extensions
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h
index 94259c197..31c149e32 100644
--- a/libminifi/include/utils/SmallString.h
+++ b/libminifi/include/utils/SmallString.h
@@ -34,6 +34,10 @@ class SmallString : public std::array<char, N + 1> {
     return {c_str()};
   }
 
+  [[nodiscard]] std::string_view view() const noexcept {
+    return std::string_view{this->data(), N};
+  }
+
   constexpr size_t length() const noexcept {
     return N;
   }
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 75ad4e833..c4b1b68cb 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -607,12 +607,14 @@ inline std::error_code hide_file(const char* const file_name) {
 
 uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
 
-inline std::string get_file_content(const std::string &file_name) {
-  std::ifstream file(file_name);
+inline std::string get_content(const std::string &file_name) {
+  std::ifstream file(file_name, std::ifstream::binary);
   std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
   return content;
 }
 
+void put_content(const std::filesystem::path& filename, std::string_view new_contents);
+
 bool contains(const std::filesystem::path& file_path, std::string_view text_to_search);
 
 
diff --git a/main/AgentDocs.h b/libminifi/include/utils/meta/type_list.h
similarity index 59%
copy from main/AgentDocs.h
copy to libminifi/include/utils/meta/type_list.h
index 0402adf11..a94899abd 100644
--- a/main/AgentDocs.h
+++ b/libminifi/include/utils/meta/type_list.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,30 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef MAIN_AGENTDOCS_H_
-#define MAIN_AGENTDOCS_H_
-
-#include <iostream>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+#pragma once
+#include <type_traits>
 
-class AgentDocs {
- public:
-  AgentDocs() = default;
-  ~AgentDocs() = default;
-  void generate(const std::string &docsdir, std::ostream &genStream);
- private:
-  inline std::string extractClassName(const std::string &processor) const;
+namespace org::apache::nifi::minifi::utils::meta {
+template<typename... Types>
+struct type_list {
+  template<typename T>
+  [[nodiscard]] constexpr static bool contains() noexcept {
+    return (std::is_same_v<T, Types> || ...);
+  }
 };
-
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif  // MAIN_AGENTDOCS_H_
+}  // namespace org::apache::nifi::minifi::utils::meta
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 78e3e36af..4e50b2447 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -83,6 +83,7 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_c2_agent_coap_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_protocol_class},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier},
+  core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier_fallback},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_trigger_classes},
   core::ConfigurationProperty{Configuration::nifi_c2_root_classes},
   core::ConfigurationProperty{Configuration::nifi_c2_root_class_definitions},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index b6fe4cc3e..ef76b9f8c 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -45,18 +45,16 @@
 #include "io/NetworkPrioritizer.h"
 #include "io/FileStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                                std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                               std::shared_ptr<core::ContentRepository> content_repo, const std::string /*name*/,
-                               std::shared_ptr<utils::file::FileSystem> filesystem)
+                               std::shared_ptr<core::ContentRepository> content_repo, const std::string& /*name*/,
+                               std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart)
     : core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()),
       c2::C2Client(std::move(configure), std::move(provenance_repo), std::move(flow_file_repo),
-                   std::move(content_repo), std::move(flow_configuration), std::move(filesystem)),
+                   std::move(content_repo), std::move(flow_configuration), std::move(filesystem),
+                   std::move(request_restart), core::logging::LoggerFactory<c2::C2Client>::getLogger()),
       running_(false),
       updating_(false),
       initialized_(false),
@@ -76,9 +74,10 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem)
+                 std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart)
       : FlowController(std::move(provenance_repo), std::move(flow_file_repo), std::move(configure), std::move(flow_configuration),
-                       std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, std::move(filesystem)) {}
+                       std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, std::move(filesystem), std::move(request_restart)) {}
 
 std::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() {
   std::string shutdown_timeout_str;
@@ -426,13 +425,14 @@ int16_t FlowController::clearConnection(const std::string &connection) {
   return -1;
 }
 
-std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() const {
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() {
   auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
   agentInfo->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get());
   agentInfo->setAgentIdentificationProvider(configuration_);
   agentInfo->setConfigurationReader([this](const std::string& key){
     return configuration_->getString(key);
   });
+  agentInfo->setStateMonitor(this);
   agentInfo->includeAgentStatus(false);
   return agentInfo;
 }
@@ -552,7 +552,4 @@ state::StateController* FlowController::getProcessorController(const std::string
   return foundController.get();
 }
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 4d1d55df6..dbc349ea2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -47,27 +47,25 @@
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller,
                  state::Pausable *pause_handler,
                  state::StateMonitor* updateSink,
-                 const std::shared_ptr<Configure> &configuration,
-                 const std::shared_ptr<utils::file::FileSystem> &filesystem)
+                 std::shared_ptr<Configure> configuration,
+                 std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart)
     : heart_beat_period_(3s),
       max_c2_responses(5),
       update_sink_(updateSink),
       update_service_(nullptr),
       controller_(controller),
       pause_handler_(pause_handler),
-      configuration_(configuration),
-      filesystem_(filesystem),
+      configuration_(std::move(configuration)),
+      filesystem_(std::move(filesystem)),
       protocol_(nullptr),
-      thread_pool_(2, false, nullptr, "C2 threadpool") {
+      thread_pool_(2, false, nullptr, "C2 threadpool"),
+      request_restart_(std::move(request_restart)) {
   manifest_sent_ = false;
 
   last_run_ = std::chrono::steady_clock::now();
@@ -80,7 +78,7 @@ C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller,
     // create a stubbed service for updating the flow identifier
   }
 
-  configure(configuration, false);
+  configure(configuration_, false);
 
   functions_.emplace_back([this] {return produce();});
   functions_.emplace_back([this] {return consume();});
@@ -347,7 +345,7 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
       update_sink_->stop();
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
       protocol_.load()->consumePayload(std::move(response));
-      restart_agent();
+      restart_needed_ = true;
     }
       break;
     case Operation::START:
@@ -634,6 +632,7 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) {
   }
   C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
   enqueue_c2_response(std::move(response));
+  if (result != state::UpdateState::NO_OPERATION) { restart_needed_ = true; }
 }
 
 /**
@@ -715,19 +714,6 @@ void C2Agent::handle_transfer(const C2ContentResponse &resp) {
   }
 }
 
-void C2Agent::restart_agent() {
-  std::string cwd = utils::Environment::getCurrentWorkingDirectory();
-  if (cwd.empty()) {
-    logger_->log_error("Could not restart the agent because the working directory could not be determined");
-    return;
-  }
-
-  std::string command = cwd + "/bin/minifi.sh restart";
-  if (system(command.c_str()) != 0) {
-    logger_->log_error("System command '%s' failed", command);
-  }
-}
-
 utils::TaskRescheduleInfo C2Agent::produce() {
   // place priority on messages to send to the c2 server
   if (protocol_.load() != nullptr) {
@@ -755,6 +741,12 @@ utils::TaskRescheduleInfo C2Agent::produce() {
           }
         });
 
+    if (restart_needed_ && requests.empty()) {
+      configuration_->commitChanges();
+      request_restart_();
+      return utils::TaskRescheduleInfo::Done();
+    }
+
     try {
       performHeartBeat();
     }
@@ -911,8 +903,4 @@ void C2Agent::enqueue_c2_server_response(C2Payload &&resp) {
   responses.enqueue(std::move(resp));
 }
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 14009c5b2..a4f7dd887 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <filesystem>
 #include <memory>
 #include <map>
 #include "c2/C2Client.h"
@@ -30,22 +31,20 @@
 #include "c2/C2Agent.h"
 #include "core/state/nodes/FlowInformation.h"
 #include "utils/file/FileSystem.h"
+#include "utils/file/FileUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2Client::C2Client(
     std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo,
     std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<utils::file::FileSystem> filesystem,
-    std::shared_ptr<core::logging::Logger> logger)
+    std::function<void()> request_restart, std::shared_ptr<core::logging::Logger> logger)
     : core::Flow(std::move(provenance_repo), std::move(flow_file_repo), std::move(content_repo), std::move(flow_configuration)),
       configuration_(std::move(configuration)),
       filesystem_(std::move(filesystem)),
-      logger_(std::move(logger)) {}
+      logger_(std::move(logger)),
+      request_restart_(std::move(request_restart)) {}
 
 void C2Client::stopC2() {
   if (c2_agent_) {
@@ -68,7 +67,14 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
     logger_->log_info("Agent class is not predefined");
   }
 
-  configuration_->setFallbackAgentIdentifier(getControllerUUID().to_string());
+  // Set a persistent fallback agent id. This is needed so that the C2 server can identify the same agent after a restart, even if nifi.c2.agent.identifier is not specified.
+  if (auto id = configuration_->get(Configuration::nifi_c2_agent_identifier_fallback)) {
+    configuration_->setFallbackAgentIdentifier(*id);
+  } else {
+    const auto agent_id = getControllerUUID().to_string();
+    configuration_->setFallbackAgentIdentifier(agent_id);
+    configuration_->set(Configuration::nifi_c2_agent_identifier_fallback, agent_id, PropertyChangeLifetime::PERSISTENT);
+  }
 
   {
     std::lock_guard<std::mutex> lock(initialization_mutex_);
@@ -141,7 +147,7 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
   if (!initialized_) {
     // C2Agent is initialized once, meaning that a C2-triggered flow/configuration update
     // might not be equal to a fresh restart
-    c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, update_sink, configuration_, filesystem_);
+    c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, update_sink, configuration_, filesystem_, request_restart_);
     c2_agent_->start();
     initialized_ = true;
   }
@@ -355,8 +361,4 @@ void C2Client::updateResponseNodeConnections() {
   }
 }
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/core/state/Value.cpp b/libminifi/src/core/state/Value.cpp
index 49cc8fda7..120b228a8 100644
--- a/libminifi/src/core/state/Value.cpp
+++ b/libminifi/src/core/state/Value.cpp
@@ -20,13 +20,11 @@
 #include <openssl/sha.h>
 #include <utility>
 #include <string>
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t));
 const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t));
@@ -58,10 +56,36 @@ std::string hashResponseNodes(const std::vector<SerializedResponseNode>& nodes)
   return utils::StringUtils::to_hex(digest, true /*uppercase*/);
 }
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+namespace {
+rapidjson::Value nodeToJson(const SerializedResponseNode& node, rapidjson::MemoryPoolAllocator<rapidjson::CrtAllocator>& alloc) {
+  if (node.value.empty()) {
+    if (node.array) {
+      rapidjson::Value result(rapidjson::kArrayType);
+      for (const auto& elem: node.children) {
+        result.PushBack(nodeToJson(elem, alloc), alloc);
+      }
+      return result;
+    } else {
+      rapidjson::Value result(rapidjson::kObjectType);
+      for (const auto& elem: node.children) {
+        result.AddMember(rapidjson::Value(elem.name.c_str(), alloc), nodeToJson(elem, alloc), alloc);
+      }
+      return result;
+    }
+  } else {
+    return rapidjson::Value(node.value.to_string().c_str(), alloc);
+  }
+}
+}  // namespace
+
+std::string SerializedResponseNode::to_string() const {
+  rapidjson::Document doc;
+  doc.SetObject();
+  doc.AddMember(rapidjson::Value(name.c_str(), doc.GetAllocator()), nodeToJson(*this, doc.GetAllocator()), doc.GetAllocator());
+  rapidjson::StringBuffer buf;
+  rapidjson::Writer<rapidjson::StringBuffer> writer{buf};
+  doc.Accept(writer);
+  return buf.GetString();
+}
+}  // namespace org::apache::nifi::minifi::state::response
 
diff --git a/extensions/systemd/WorkerThread.cpp b/libminifi/src/utils/FifoExecutor.cpp
similarity index 77%
rename from extensions/systemd/WorkerThread.cpp
rename to libminifi/src/utils/FifoExecutor.cpp
index 4213d29f8..f7bf0a807 100644
--- a/extensions/systemd/WorkerThread.cpp
+++ b/libminifi/src/utils/FifoExecutor.cpp
@@ -15,11 +15,9 @@
  * limitations under the License.
  */
 
-#include "WorkerThread.h"
+#include "utils/FifoExecutor.h"
 
-namespace org { namespace apache { namespace nifi { namespace minifi { namespace extensions { namespace systemd {
-
-namespace detail {
+namespace org::apache::nifi::minifi::utils::detail {
 WorkerThread::WorkerThread()
     : thread_{&WorkerThread::run, this} {}
 
@@ -33,11 +31,4 @@ void WorkerThread::run() noexcept {
     task_queue_.consumeWait([](std::packaged_task<void()>&& f) { f(); });
   }
 }
-}  // namespace detail
-
-}  // namespace systemd
-}  // namespace extensions
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils::detail
diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp
index 456211130..2dc6f41f8 100644
--- a/libminifi/src/utils/file/FileUtils.cpp
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -106,6 +106,13 @@ std::chrono::time_point<std::chrono::system_clock> to_sys_time_point(const std::
 #endif
 }
 
+void put_content(const std::filesystem::path& filename, std::string_view new_contents) {
+  std::ofstream ofs;
+  ofs.exceptions(std::ofstream::badbit | std::ofstream::failbit);
+  ofs.open(filename, std::ofstream::binary);
+  ofs.write(new_contents.data(), gsl::narrow<std::streamsize>(new_contents.size()));
+}
+
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
diff --git a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
index 844f33d0a..06999e321 100644
--- a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
@@ -26,7 +26,7 @@
 namespace {
 
 using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-using org::apache::nifi::minifi::utils::file::get_file_content;
+using org::apache::nifi::minifi::utils::file::get_content;
 using org::apache::nifi::minifi::utils::file::get_separator;
 
 class FetchS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::FetchS3Object> {
@@ -120,7 +120,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default properties", "[awsS3Co
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTimeRuleId value:" + S3_EXPIRATION_TIME_RULE_ID));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
-  REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME) == S3_CONTENT);
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME) == S3_CONTENT);
   REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId().empty());
   REQUIRE(!mock_s3_request_sender_ptr->get_object_request.VersionIdHasBeenSet());
   REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::NOT_SET);
@@ -140,7 +140,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test empty optional S3 results", "[
   REQUIRE(!LogTestController::getInstance().contains("key:s3.expirationTimeRuleId", std::chrono::seconds(0), std::chrono::milliseconds(0)));
   REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm", std::chrono::seconds(0), std::chrono::milliseconds(0)));
   REQUIRE(!LogTestController::getInstance().contains("key:s3.version", std::chrono::seconds(0), std::chrono::milliseconds(0)));
-  REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME).empty());
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty());
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS3Config]") {
@@ -150,7 +150,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:logs.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path value:dir1/dir2"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:absolute.path value:dir1/dir2/logs.txt"));
-  REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME).empty());
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty());
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test optional values are set in request", "[awsS3Config]") {
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index 9841ea8ba..9151c6209 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -60,7 +60,8 @@ class TestControllerWithFlow: public TestController{
     controller_ = std::make_shared<minifi::FlowController>(
         prov_repo, ff_repo, configuration_,
         std::move(flow),
-        content_repo, DEFAULT_ROOT_GROUP_NAME);
+        content_repo, DEFAULT_ROOT_GROUP_NAME,
+        std::make_shared<utils::file::FileSystem>(), []{});
     controller_->load(std::move(root));
   }
 
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index d247bd4bd..1c43656db 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -19,6 +19,7 @@
 
 #define DEFAULT_WAITTIME_MSECS 3000
 
+#include <future>
 #include <memory>
 #include <optional>
 #include <string>
@@ -34,6 +35,7 @@
 #include "core/ConfigurableComponent.h"
 #include "controllers/SSLContextService.h"
 #include "HTTPUtils.h"
+#include "utils/FifoExecutor.h"
 
 namespace minifi = org::apache::nifi::minifi;
 namespace core = minifi::core;
@@ -42,7 +44,30 @@ namespace utils = minifi::utils;
 class IntegrationBase {
  public:
   explicit IntegrationBase(std::chrono::milliseconds waitTime = std::chrono::milliseconds(DEFAULT_WAITTIME_MSECS));
-
+  IntegrationBase(const IntegrationBase&) = delete;
+  IntegrationBase(IntegrationBase&& other) noexcept
+      :configuration{std::move(other.configuration)},
+      flowController_{std::move(other.flowController_)},
+      wait_time_{other.wait_time_},
+      port{std::move(other.port)},
+      scheme{std::move(other.scheme)},
+      key_dir{std::move(other.key_dir)},
+      state_dir{std::move(other.state_dir)},
+      restart_requested_count_{other.restart_requested_count_.load()}
+  {}
+  IntegrationBase& operator=(const IntegrationBase&) = delete;
+  IntegrationBase& operator=(IntegrationBase&& other) noexcept {
+    if (&other == this) return *this;
+    configuration = std::move(other.configuration);
+    flowController_ = std::move(other.flowController_);
+    wait_time_ = other.wait_time_;
+    port = std::move(other.port);
+    scheme = std::move(other.scheme);
+    key_dir = std::move(other.key_dir);
+    state_dir = std::move(other.state_dir);
+    restart_requested_count_ = other.restart_requested_count_.load();
+    return *this;
+  }
   virtual ~IntegrationBase() = default;
 
   virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& home_path = {});
@@ -93,6 +118,7 @@ class IntegrationBase {
   std::string port, scheme;
   std::string key_dir;
   std::string state_dir;
+  std::atomic<int> restart_requested_count_{0};
 };
 
 IntegrationBase::IntegrationBase(std::chrono::milliseconds waitTime)
@@ -111,6 +137,7 @@ void IntegrationBase::configureSecurity() {
 }
 
 void IntegrationBase::run(const std::optional<std::string>& test_file_location, const std::optional<std::string>& home_path) {
+  using namespace std::literals::chrono_literals;
   testSetup();
 
   std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
@@ -126,46 +153,67 @@ void IntegrationBase::run(const std::optional<std::string>& test_file_location,
 
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-
-  bool should_encrypt_flow_config = (configuration->get(minifi::Configure::nifi_flow_configuration_encrypt)
-                                     | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
 
-  std::shared_ptr<utils::file::FileSystem> filesystem;
-  if (home_path) {
-    filesystem = std::make_shared<utils::file::FileSystem>(
-        should_encrypt_flow_config,
-        utils::crypto::EncryptionProvider::create(*home_path));
-  } else {
-    filesystem = std::make_shared<utils::file::FileSystem>();
-  }
-
-  std::unique_ptr<core::FlowConfiguration> flow_config = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem));
+  std::atomic<bool> running = true;
+  utils::FifoExecutor assertion_runner;
+  std::future<void> assertions_done;
+  while (running) {
+    running = false;  // Stop running after this iteration, unless restart is explicitly requested
 
-  auto controller_service_provider = flow_config->getControllerServiceProvider();
-  char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
-  state_dir = utils::file::create_temp_directory(state_dir_name_template);
-  if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) {
-    configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir);
-  }
-  core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration);
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
-  std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
-  queryRootProcessGroup(pg);
+    bool should_encrypt_flow_config = (configuration->get(minifi::Configure::nifi_flow_configuration_encrypt)
+        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
 
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+    std::shared_ptr<utils::file::FileSystem> filesystem;
+    if (home_path) {
+      filesystem = std::make_shared<utils::file::FileSystem>(
+          should_encrypt_flow_config,
+          utils::crypto::EncryptionProvider::create(*home_path));
+    } else {
+      filesystem = std::make_shared<utils::file::FileSystem>();
+    }
 
-  flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, DEFAULT_ROOT_GROUP_NAME);
-  flowController_->load();
-  updateProperties(*flowController_);
-  flowController_->start();
+    auto flow_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem);
 
-  runAssertions();
+    auto controller_service_provider = flow_config->getControllerServiceProvider();
+    char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
+    state_dir = utils::file::create_temp_directory(state_dir_name_template);
+    if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) {
+      configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir);
+    }
+    core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration);
+
+    std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
+    queryRootProcessGroup(pg);
+
+    std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+    const auto request_restart = [&, this] {
+      ++restart_requested_count_;
+      running = true;
+    };
+    flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, DEFAULT_ROOT_GROUP_NAME,
+        std::make_shared<utils::file::FileSystem>(), request_restart);
+    flowController_->load();
+    updateProperties(*flowController_);
+    flowController_->start();
+
+    assertions_done = assertion_runner.enqueue([this] { runAssertions(); });
+    std::future_status status = std::future_status::ready;
+    while (!running && (status = assertions_done.wait_for(10ms)) == std::future_status::timeout) { /* wait */ }
+    if (running && status != std::future_status::timeout) {
+      // cancel restart, because assertions have finished running
+      running = false;
+    }
 
-  shutdownBeforeFlowController();
-  flowController_->unload();
-  flowController_->stopC2();
+    if (!running) {
+      // Only stop servers if we're shutting down
+      shutdownBeforeFlowController();
+    }
+    flowController_->unload();
+    flowController_->stopC2();
+  }
 
   cleanup();
 }
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 0a78b2fd5..80aad7d0e 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -20,17 +20,11 @@
 #undef NDEBUG
 #include <cassert>
 #include <chrono>
-#include <fstream>
-#include <utility>
 #include <memory>
+#include <utility>
 #include <string>
 #include <thread>
-#include <type_traits>
-#include <vector>
 #include "utils/file/FileUtils.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
 #include "FlowController.h"
@@ -38,7 +32,6 @@
 #include "../unit/ProvenanceTestHelper.h"
 #include "io/StreamFactory.h"
 #include "../TestBase.h"
-#include "../Catch.h"
 #include "utils/IntegrationTestUtils.h"
 
 int main(int argc, char **argv) {
@@ -63,12 +56,13 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
+      test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
-      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
+  const auto controller = std::make_shared<minifi::FlowController>(
+      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index d28011596..276a06445 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -176,7 +176,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
   {
     TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
@@ -290,7 +290,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
   {
     TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 3ca9f950b..5e96aec40 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -290,7 +290,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
   std::string data = "banana";
   minifi::io::BufferStream content(data);
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 760904bd8..65c91bf1f 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -244,7 +244,8 @@ class TestFlowController : public org::apache::nifi::minifi::FlowController {
   TestFlowController(std::shared_ptr<org::apache::nifi::minifi::core::Repository> repo, std::shared_ptr<org::apache::nifi::minifi::core::Repository> flow_file_repo,
       const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/)
       :org::apache::nifi::minifi::FlowController(repo, flow_file_repo, std::make_shared<org::apache::nifi::minifi::Configure>(), nullptr,
-          std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(), "") {
+          std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(), "",
+          std::make_shared<org::apache::nifi::minifi::utils::file::FileSystem>(), []{}) {
   }
 
   ~TestFlowController() override = default;
diff --git a/main/AgentDocs.cpp b/main/AgentDocs.cpp
index 222e76a2e..bd5b834af 100644
--- a/main/AgentDocs.cpp
+++ b/main/AgentDocs.cpp
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -34,18 +33,13 @@
 #include "core/Relationship.h"
 #include "io/validation.h"
 #include "utils/file/FileUtils.h"
-#include "agent/build_description.h"
 #include "agent/agent_docs.h"
 #include "agent/agent_version.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+namespace org::apache::nifi::minifi::docs {
 
 std::string AgentDocs::extractClassName(const std::string &processor) const {
-  auto positionOfLastDot = processor.find_last_of(".");
+  auto positionOfLastDot = processor.find_last_of('.');
   if (positionOfLastDot != std::string::npos) {
     return processor.substr(positionOfLastDot + 1);
   }
@@ -55,7 +49,7 @@ std::string AgentDocs::extractClassName(const std::string &processor) const {
 void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) {
   std::map<std::string, ClassDescription> processorSet;
   for (const auto &group : minifi::AgentBuild::getExtensions()) {
-    struct Components descriptions = BuildDescription::getClassDescriptions(group);
+    struct Components descriptions = build_description_.getClassDescriptions(group);
     for (const auto &processorName : descriptions.processors_) {
       processorSet.insert(std::make_pair(extractClassName(processorName.class_name_), processorName));
     }
@@ -64,18 +58,18 @@ void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) {
     const std::string &filename = docsdir + utils::file::get_separator() + processor.first;
     std::ofstream outfile(filename);
 
-    std::string description;
-
-    bool foundDescription = minifi::AgentDocs::getDescription(processor.first, description);
-
-    if (!foundDescription) {
-      foundDescription = minifi::AgentDocs::getDescription(processor.second.class_name_, description);
-    }
+    {
+      std::string description;
+      bool foundDescription = minifi::AgentDocs::getDescription(processor.first, description);
+      if (!foundDescription) {
+        foundDescription = minifi::AgentDocs::getDescription(processor.second.class_name_, description);
+      }
 
-    outfile << "## " << processor.first << std::endl << std::endl;
-    if (foundDescription) {
-      outfile << "### Description " << std::endl << std::endl;
-      outfile << description << std::endl;
+      outfile << "## " << processor.first << std::endl << std::endl;
+      if (foundDescription) {
+        outfile << "### Description " << std::endl << std::endl;
+        outfile << description << std::endl;
+      }
     }
 
     outfile << "### Properties " << std::endl << std::endl;
@@ -165,8 +159,4 @@ void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) {
   }
 }
 
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::docs
diff --git a/main/AgentDocs.h b/main/AgentDocs.h
index 0402adf11..1b880b3eb 100644
--- a/main/AgentDocs.h
+++ b/main/AgentDocs.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,26 +18,18 @@
 #define MAIN_AGENTDOCS_H_
 
 #include <iostream>
+#include "agent/build_description.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+namespace org::apache::nifi::minifi::docs {
 
 class AgentDocs {
  public:
-  AgentDocs() = default;
-  ~AgentDocs() = default;
   void generate(const std::string &docsdir, std::ostream &genStream);
  private:
-  inline std::string extractClassName(const std::string &processor) const;
+  [[nodiscard]] inline std::string extractClassName(const std::string &processor) const;
+  BuildDescription build_description_;
 };
 
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::docs
 
 #endif  // MAIN_AGENTDOCS_H_
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index e90b8665d..70afd8085 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -43,6 +43,7 @@
 #include <signal.h>
 #include <sodium.h>
 
+#include <atomic>
 #include <cstdlib>
 #include <iostream>
 #include <memory>
@@ -68,8 +69,8 @@ namespace core = minifi::core;
 namespace utils = minifi::utils;
 
  // Variables that allow us to avoid a timed wait.
-sem_t *running;
-//! Flow Controller
+static sem_t *flow_controller_running;
+static sem_t *process_running;
 
 /**
  * Removed the stop command from the signal handler so that we could trigger
@@ -83,24 +84,27 @@ sem_t *running;
 
 #ifdef WIN32
 BOOL WINAPI consoleSignalHandler(DWORD signal) {
+  if (!process_running) { exit(0); return TRUE; }
   if (signal == CTRL_C_EVENT || signal == CTRL_BREAK_EVENT) {
-    sem_post(running);
-    if (sem_wait(running) == -1)
-      perror("sem_wait");
+    int ret = ETIMEDOUT;
+    while (ret == ETIMEDOUT) {
+      if (flow_controller_running) { sem_post(flow_controller_running); }
+      const struct timespec timeout_100ms { .tv_sec = 0, .tv_nsec = 100000000};
+      ret = sem_timedwait(process_running, &timeout_100ms);
+    }
+    return TRUE;
   }
-
-  return TRUE;
+  return FALSE;
 }
 
 void SignalExitProcess() {
-  sem_post(running);
+  sem_post(flow_controller_running);
 }
 #endif
 
 void sigHandler(int signal) {
   if (signal == SIGINT || signal == SIGTERM) {
-    // avoid stopping the controller here.
-    sem_post(running);
+    sem_post(flow_controller_running);
   }
 }
 
@@ -148,20 +152,6 @@ int main(int argc, char **argv) {
     return -1;
   }
 
-  uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
-
-  std::string graceful_shutdown_seconds;
-  std::string prov_repo_class = "provenancerepository";
-  std::string flow_repo_class = "flowfilerepository";
-  std::string nifi_configuration_class_name = "yamlconfiguration";
-  std::string content_repo_class = "filesystemrepository";
-
-  running = sem_open("/MiNiFiMain", O_CREAT, 0644, 0);
-  if (running == SEM_FAILED || running == 0) {
-    logger->log_error("could not initialize semaphore");
-    perror("initialization failure");
-  }
-
 #ifdef WIN32
   if (!SetConsoleCtrlHandler(consoleSignalHandler, TRUE)) {
     logger->log_error("Cannot install signal handler");
@@ -184,237 +174,274 @@ int main(int argc, char **argv) {
     return -1;
   }
 #endif
-
   // Determine MINIFI_HOME
   const std::string minifiHome = determineMinifiHome(logger);
   if (minifiHome.empty()) {
     // determineMinifiHome already logged everything we need
     return -1;
   }
-
   // chdir to MINIFI_HOME
   if (!utils::Environment::setCurrentWorkingDirectory(minifiHome.c_str())) {
     logger->log_error("Failed to change working directory to MINIFI_HOME (%s)", minifiHome);
     return -1;
   }
+  const auto flow_controller_semaphore_path = "/MiNiFiMain";
+  const auto process_semaphore_path = "/MiNiFiProc";
 
-  const auto log_properties = std::make_shared<core::logging::LoggerProperties>();
-  log_properties->setHome(minifiHome);
-  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
-  core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
-
-  std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties");
-  uid_properties->setHome(minifiHome);
-  uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE);
-  utils::IdGenerator::getIdGenerator()->initialize(uid_properties);
-
-  // Make a record of minifi home in the configured log file.
-  logger->log_info("MINIFI_HOME=%s", minifiHome);
-
-  auto decryptor = minifi::Decryptor::create(minifiHome);
-  if (decryptor) {
-    logger->log_info("Found encryption key, will decrypt sensitive properties in the configuration");
-  } else {
-    logger->log_info("No encryption key found, will not decrypt sensitive properties in the configuration");
+  process_running = sem_open(process_semaphore_path, O_CREAT, 0644, 0);
+  if (process_running == SEM_FAILED) {
+    logger->log_error("could not initialize process semaphore");
+    perror("sem_open");
+    return -1;
   }
 
-  const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor), std::move(log_properties));
-  configure->setHome(minifiHome);
-  configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
-
-  minifi::core::extension::ExtensionManager::get().initialize(configure);
-
-  if (argc >= 3 && std::string("docs") == argv[1]) {
-    if (utils::file::create_dir(argv[2]) != 0) {
-      std::cerr << "Working directory doesn't exist and cannot be created: " << argv[2] << std::endl;
-      exit(1);
-    }
-
-    std::cerr << "Dumping docs to " << argv[2] << std::endl;
-    if (argc == 4) {
-      std::string filepath, filename;
-      utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename);
-      if (filepath == argv[2]) {
-        std::cerr << "Target file should be out of the working directory: " << filepath << std::endl;
-        exit(1);
-      }
-      std::ofstream outref(argv[3]);
-      dumpDocs(configure, argv[2], outref);
-    } else {
-      dumpDocs(configure, argv[2], std::cout);
+  std::atomic<bool> restart_token{false};
+  const auto request_restart = [&] {
+    if (!restart_token.exchange(true)) {
+      // only do sem_post if a restart is not already in progress (the flag was unset before the exchange)
+      sem_post(flow_controller_running);
+      logger->log_info("Initiating restart...");
     }
-    exit(0);
-  }
-
+  };
 
-  if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) {
-    try {
-      stop_wait_time = std::stoi(graceful_shutdown_seconds);
-    }
-    catch (const std::out_of_range &e) {
-      logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
-    }
-    catch (const std::invalid_argument &e) {
-      logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+  do {
+    flow_controller_running = sem_open(flow_controller_semaphore_path, O_CREAT, 0644, 0);
+    if (flow_controller_running == SEM_FAILED) {
+      logger->log_error("could not initialize flow controller semaphore");
+      perror("sem_open");
+      return -1;
     }
-  }
-  else {
-    logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds,
-      STOP_WAIT_TIME_MS);
-  }
-
-  configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class);
-  // Create repos for flow record and provenance
-  std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance");
 
-  if (!prov_repo->initialize(configure)) {
-    logger->log_error("Provenance repository failed to initialize, exiting..");
-    exit(1);
-  }
-
-  configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class);
-
-  std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile");
-
-  if (!flow_repo->initialize(configure)) {
-    logger->log_error("Flow file repository failed to initialize, exiting..");
-    exit(1);
-  }
+    uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
 
-  configure->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class);
+    std::string graceful_shutdown_seconds;
+    std::string prov_repo_class = "provenancerepository";
+    std::string flow_repo_class = "flowfilerepository";
+    std::string nifi_configuration_class_name = "yamlconfiguration";
+    std::string content_repo_class = "filesystemrepository";
 
-  std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content");
+    const auto log_properties = std::make_shared<core::logging::LoggerProperties>();
+    log_properties->setHome(minifiHome);
+    log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
+    core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
 
-  if (!content_repo->initialize(configure)) {
-    logger->log_error("Content repository failed to initialize, exiting..");
-    exit(1);
-  }
-
-  std::string content_repo_path;
-  if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) {
-    core::logging::LOG_INFO(logger) << "setting default dir to " << content_repo_path;
-    minifi::setDefaultDirectory(content_repo_path);
-  }
+    std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties");
+    uid_properties->setHome(minifiHome);
+    uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE);
+    utils::IdGenerator::getIdGenerator()->initialize(uid_properties);
 
-  configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
+    // Make a record of minifi home in the configured log file.
+    logger->log_info("MINIFI_HOME=%s", minifiHome);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure);
+    auto decryptor = minifi::Decryptor::create(minifiHome);
+    if (decryptor) {
+      logger->log_info("Found encryption key, will decrypt sensitive properties in the configuration");
+    } else {
+      logger->log_info("No encryption key found, will not decrypt sensitive properties in the configuration");
+    }
 
-  bool should_encrypt_flow_config = (configure->get(minifi::Configure::nifi_flow_configuration_encrypt)
-      | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+    const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor), std::move(log_properties));
+    configure->setHome(minifiHome);
+    configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
 
-  auto filesystem = std::make_shared<utils::file::FileSystem>(
-      should_encrypt_flow_config,
-      utils::crypto::EncryptionProvider::create(minifiHome));
+    minifi::core::extension::ExtensionManager::get().initialize(configure);
 
-  std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(
-      prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name,
-      configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem);
+    if (argc >= 3 && std::string("docs") == argv[1]) {
+      if (utils::file::create_dir(argv[2]) != 0) {
+        std::cerr << "Working directory doesn't exist and cannot be created: " << argv[2] << std::endl;
+        exit(1);
+      }
 
-  const auto controller = std::make_unique<minifi::FlowController>(
-      prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem);
+      std::cerr << "Dumping docs to " << argv[2] << std::endl;
+      if (argc == 4) {
+        std::string filepath, filename;
+        utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename);
+        if (filepath == argv[2]) {
+          std::cerr << "Target file should be out of the working directory: " << filepath << std::endl;
+          exit(1);
+        }
+        std::ofstream outref(argv[3]);
+        dumpDocs(configure, argv[2], outref);
+      } else {
+        dumpDocs(configure, argv[2], std::cout);
+      }
+      exit(0);
+    }
 
-  const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v){ return v == "true"; })).value_or(true);
-  std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
-  if (disk_space_watchdog_enable) {
-    try {
-      const auto repo_paths = [&] {
-        std::vector<std::string> repo_paths;
-        repo_paths.reserve(3);
-        // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
-        const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; };
-        auto prov_repo_path = prov_repo->getDirectory();
-        auto flow_repo_path = flow_repo->getDirectory();
-        auto content_repo_storage_path = content_repo->getStoragePath();
-        if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); }
-        if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); }
-        if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); }
-        return repo_paths;
-      }();
-      const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
-      const auto config = minifi::disk_space_watchdog::read_config(*configure);
-      const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
-        const auto it = std::min_element(std::begin(spaces), std::end(spaces));
-        return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)();
-      };
-      if (min_space(available_spaces) <= config.stop_threshold_bytes) {
-        logger->log_error("Cannot start MiNiFi due to insufficient available disk space");
-        return -1;
+    if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) {
+      try {
+        stop_wait_time = std::stoi(graceful_shutdown_seconds);
       }
-      auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config);
-      disk_space_watchdog = std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable {
-        const auto stop = [&]{ controller->stop(); controller->unload(); };
-        const auto restart = [&]{ controller->load(); controller->start(); };
-        const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get())));
-        if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) {
-          logger->log_warn("Stopping flow controller due to insufficient disk space");
-          stop();
-        } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) {
-          logger->log_info("Restarting flow controller");
-          restart();
-        }
-      });
-    } catch (const std::runtime_error& error) {
-      logger->log_error(error.what());
-      return -1;
+      catch (const std::out_of_range& e) {
+        logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+      }
+      catch (const std::invalid_argument& e) {
+        logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+      }
+    } else {
+      logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds,
+          STOP_WAIT_TIME_MS);
     }
-  }
 
-  logger->log_info("Loading FlowController");
+    configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class);
+    // Create repos for flow record and provenance
+    std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance");
 
-  // Load flow from specified configuration file
-  try {
-    controller->load();
-  }
-  catch (std::exception &e) {
-    logger->log_error("Failed to load configuration due to exception: %s", e.what());
-    return -1;
-  }
-  catch (...) {
-    logger->log_error("Failed to load configuration due to unknown exception");
-    return -1;
-  }
+    if (!prov_repo->initialize(configure)) {
+      logger->log_error("Provenance repository failed to initialize, exiting..");
+      exit(1);
+    }
 
-  // Start Processing the flow
-  controller->start();
+    configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class);
 
-  if (disk_space_watchdog) { disk_space_watchdog->start(); }
+    std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile");
 
-  logger->log_info("MiNiFi started");
+    if (!flow_repo->initialize(configure)) {
+      logger->log_error("Flow file repository failed to initialize, exiting..");
+      exit(1);
+    }
 
-  /**
-   * Sem wait provides us the ability to have a controlled
-   * yield without the need for a more complex construct and
-   * a spin lock
-   */
-  int ret_val;
-  while ((ret_val = sem_wait(running)) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_wait");
+    configure->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class);
 
-  while ((ret_val = sem_close(running)) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_close");
+    std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content");
 
-  while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_unlink");
+    if (!content_repo->initialize(configure)) {
+      logger->log_error("Content repository failed to initialize, exiting..");
+      exit(1);
+    }
 
-  disk_space_watchdog = nullptr;
+    std::string content_repo_path;
+    if (configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) {
+      core::logging::LOG_INFO(logger) << "setting default dir to " << content_repo_path;
+      minifi::setDefaultDirectory(content_repo_path);
+    }
 
-  /**
-   * Trigger unload -- wait stop_wait_time
-   */
-  controller->waitUnload(stop_wait_time);
+    configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
+
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configure);
+
+    bool should_encrypt_flow_config = (configure->get(minifi::Configure::nifi_flow_configuration_encrypt)
+        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+
+    auto filesystem = std::make_shared<utils::file::FileSystem>(
+        should_encrypt_flow_config,
+        utils::crypto::EncryptionProvider::create(minifiHome));
+
+    std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(
+        prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name,
+        configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem);
+
+    const auto controller = std::make_unique<minifi::FlowController>(
+        prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem, request_restart);
+
+    const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v) { return v == "true"; })).value_or(true);
+    std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
+    if (disk_space_watchdog_enable) {
+      try {
+        const auto repo_paths = [&] {
+          std::vector<std::string> repo_paths;
+          repo_paths.reserve(3);
+          // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
+          const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; };
+          auto prov_repo_path = prov_repo->getDirectory();
+          auto flow_repo_path = flow_repo->getDirectory();
+          auto content_repo_storage_path = content_repo->getStoragePath();
+          if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); }
+          if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); }
+          if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); }
+          return repo_paths;
+        }();
+        const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
+        const auto config = minifi::disk_space_watchdog::read_config(*configure);
+        const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
+          const auto it = std::min_element(std::begin(spaces), std::end(spaces));
+          return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)();
+        };
+        if (min_space(available_spaces) <= config.stop_threshold_bytes) {
+          logger->log_error("Cannot start MiNiFi due to insufficient available disk space");
+          return -1;
+        }
+        auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config);
+        disk_space_watchdog = std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable {
+          const auto stop = [&] {
+            controller->stop();
+            controller->unload();
+          };
+          const auto restart = [&] {
+            controller->load();
+            controller->start();
+          };
+          const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get())));
+          if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) {
+            logger->log_warn("Stopping flow controller due to insufficient disk space");
+            stop();
+          } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) {
+            logger->log_info("Restarting flow controller");
+            restart();
+          }
+        });
+      } catch (const std::runtime_error& error) {
+        logger->log_error(error.what());
+        return -1;
+      }
+    }
 
-  controller->stopC2();
+    logger->log_info("Loading FlowController");
 
-  flow_repo = nullptr;
+    // Load flow from specified configuration file
+    try {
+      controller->load();
+    }
+    catch (std::exception& e) {
+      logger->log_error("Failed to load configuration due to exception: %s", e.what());
+      return -1;
+    }
+    catch (...) {
+      logger->log_error("Failed to load configuration due to unknown exception");
+      return -1;
+    }
 
-  prov_repo = nullptr;
+    // Start Processing the flow
+    controller->start();
+
+    if (disk_space_watchdog) { disk_space_watchdog->start(); }
+
+    logger->log_info("MiNiFi started");
+
+    /**
+     * Sem wait provides us the ability to have a controlled
+     * yield without the need for a more complex construct and
+     * a spin lock
+     */
+    int ret_val;
+    while ((ret_val = sem_wait(flow_controller_running)) == -1 && errno == EINTR) {}
+    if (ret_val == -1) perror("sem_wait");
+
+    while ((ret_val = sem_close(flow_controller_running)) == -1 && errno == EINTR) {}
+    if (ret_val == -1) perror("sem_close");
+    flow_controller_running = nullptr;
+
+    while ((ret_val = sem_unlink(flow_controller_semaphore_path)) == -1 && errno == EINTR) {}
+    if (ret_val == -1) perror("sem_unlink");
+
+    disk_space_watchdog = nullptr;
+
+    /**
+     * Trigger unload -- wait stop_wait_time
+     */
+    controller->waitUnload(stop_wait_time);
+    controller->stopC2();
+    flow_repo = nullptr;
+    prov_repo = nullptr;
+  } while ([&] {
+    const auto restart_token_temp = restart_token.exchange(false);
+    if (restart_token_temp) {
+      logger->log_info("Restarting MiNiFi");
+    }
+    return restart_token_temp;
+  }());
 
+  if (process_running) { sem_post(process_running); }
   logger->log_info("MiNiFi exit");
-
-#ifdef WIN32
-  sem_post(running);
-#endif
-
   return 0;
 }
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp
index 9a7c755c9..d4af51260 100644
--- a/nanofi/src/cxx/C2CallbackAgent.cpp
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -28,15 +28,11 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/FileManager.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider* controller, state::Pausable* pause_handler, state::StateMonitor* updateSink,
                                  const std::shared_ptr<Configure> &configuration)
-    : C2Agent(controller, pause_handler, updateSink, configuration),
+    : C2Agent(controller, pause_handler, updateSink, configuration, std::make_shared<utils::file::FileSystem>(), []{}),
       stop(nullptr) {
 }
 
@@ -72,8 +68,4 @@ void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) {
   }
 }
 
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} /* namespace org::apache::nifi::minifi::c2 */


[nifi-minifi-cpp] 02/03: MINIFICPP-1796 Fix getting raw value of log properties

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c59807878d9049915e5c32f196ad57eb32f821e1
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Apr 13 13:57:01 2022 +0200

    MINIFICPP-1796 Fix getting raw value of log properties
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1301
---
 .../http-curl/tests/C2DescribeManifestTest.cpp     |  5 ++++
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    |  7 +-----
 extensions/http-curl/tests/ConfigTestAccessor.h    | 29 ++++++++++++++++++++++
 extensions/http-curl/tests/HTTPHandlers.h          |  2 +-
 libminifi/include/properties/Configure.h           |  1 +
 libminifi/src/Configure.cpp                        | 29 ++++++++++++++--------
 libminifi/src/FlowController.cpp                   |  2 +-
 libminifi/src/c2/C2Client.cpp                      |  2 +-
 8 files changed, 57 insertions(+), 20 deletions(-)

diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
index 242800ee4..42e7d1aac 100644
--- a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -23,6 +23,7 @@
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 #include "properties/Configuration.h"
+#include "ConfigTestAccessor.h"
 
 class DescribeManifestHandler: public HeartbeatHandler {
  public:
@@ -56,12 +57,16 @@ int main(int argc, char **argv) {
   harness.setKeyDir(args.key_dir);
   DescribeManifestHandler responder(harness.getConfiguration(), verified);
 
+  auto logger_properties = std::make_shared<core::logging::LoggerProperties>();
+  ConfigTestAccessor::call_setLoggerProperties(harness.getConfiguration(), logger_properties);
+
   harness.getConfiguration()->set(minifi::Configuration::nifi_rest_api_password, encrypted_value);
   harness.getConfiguration()->set(std::string(minifi::Configuration::nifi_rest_api_password) + ".protected", utils::crypto::EncryptionType::name());
   harness.getConfiguration()->set(minifi::Configuration::nifi_server_name, "server_name");
   harness.getConfiguration()->set(minifi::Configuration::nifi_framework_dir, "framework_path");
   harness.getConfiguration()->set(minifi::Configuration::nifi_sensitive_props_additional_keys,
     std::string(minifi::Configuration::nifi_framework_dir) + ", " + std::string(minifi::Configuration::nifi_server_name));
+  harness.getConfiguration()->set(minifi::Configuration::nifi_log_appender_rolling_directory, "/var/log/minifi");
 
   harness.setUrl(args.url, &responder);
   harness.run(args.test_file);
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index d92497c0b..27773ddd6 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -28,6 +28,7 @@
 #include "spdlog/sinks/dist_sink.h"
 #include "LogUtils.h"
 #include "properties/PropertiesFile.h"
+#include "ConfigTestAccessor.h"
 
 struct PropertyChange {
   std::string name;
@@ -116,12 +117,6 @@ namespace test {
 struct DummyClass3 {};
 }  // namespace test
 
-struct ConfigTestAccessor {
-  static void call_setLoggerProperties(const std::shared_ptr<minifi::Configure>& config, std::shared_ptr<core::logging::LoggerProperties> props) {
-    config->setLoggerProperties(props);
-  }
-};
-
 int main() {
   TempDirectory tmp_dir;
 
diff --git a/extensions/http-curl/tests/ConfigTestAccessor.h b/extensions/http-curl/tests/ConfigTestAccessor.h
new file mode 100644
index 000000000..e1dac61a0
--- /dev/null
+++ b/extensions/http-curl/tests/ConfigTestAccessor.h
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "properties/Configure.h"
+#include "core/logging/LoggerProperties.h"
+
+struct ConfigTestAccessor {
+  static void call_setLoggerProperties(const std::shared_ptr<minifi::Configure>& config, std::shared_ptr<core::logging::LoggerProperties> props) {
+    config->setLoggerProperties(props);
+  }
+};
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 7bae42aed..cdba06fbe 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -562,7 +562,7 @@ class HeartbeatHandler : public ServerAwareHandler {
           std::unordered_map<std::string, std::string> config_property;
           if (ranges::find(disallowed_properties, property.name) == ranges::end(disallowed_properties)) {
             config_property.emplace("propertyName", property.name);
-            if (auto value = configuration_->getString(std::string(property.name))) {
+            if (auto value = configuration_->getRawValue(std::string(property.name))) {
               config_property.emplace("propertyValue", *value);
             }
             config_property.emplace("validator", property.validator->getName());
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 88b68787d..76d8b8fc2 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -42,6 +42,7 @@ class Configure : public Configuration, public core::AgentIdentificationProvider
   bool get(const std::string& key, std::string& value) const;
   bool get(const std::string& key, const std::string& alternate_key, std::string& value) const;
   std::optional<std::string> get(const std::string& key) const;
+  std::optional<std::string> getRawValue(const std::string& key) const;
 
   std::optional<std::string> getAgentClass() const override;
   std::string getAgentIdentifier() const override;
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 2aa42e0b5..7067c2684 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -30,19 +30,14 @@ namespace nifi {
 namespace minifi {
 
 bool Configure::get(const std::string& key, std::string& value) const {
-  static constexpr std::string_view log_prefix = "nifi.log.";
-  if (utils::StringUtils::startsWith(key, log_prefix)) {
-    if (logger_properties_) {
-      return logger_properties_->getString(key.substr(log_prefix.length()), value);
+  if (auto opt_value = getRawValue(key)) {
+    value = *opt_value;
+    if (decryptor_ && isEncrypted(key)) {
+      value = decryptor_->decrypt(value);
     }
-    return false;
-  }
-
-  bool found = getString(key, value);
-  if (decryptor_ && found && isEncrypted(key)) {
-    value = decryptor_->decrypt(value);
+    return true;
   }
-  return found;
+  return false;
 }
 
 bool Configure::get(const std::string& key, const std::string& alternate_key, std::string& value) const {
@@ -69,6 +64,18 @@ std::optional<std::string> Configure::get(const std::string& key) const {
   return std::nullopt;
 }
 
+std::optional<std::string> Configure::getRawValue(const std::string& key) const {
+  static constexpr std::string_view log_prefix = "nifi.log.";
+  if (utils::StringUtils::startsWith(key, log_prefix)) {
+    if (logger_properties_) {
+      return logger_properties_->getString(key.substr(log_prefix.length()));
+    }
+    return std::nullopt;
+  }
+
+  return getString(key);
+}
+
 bool Configure::isEncrypted(const std::string& key) const {
   gsl_Expects(decryptor_);
   const auto encryption_marker = getString(key + ".protected");
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index ef76b9f8c..25a1a4eb5 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -430,7 +430,7 @@ std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest(
   agentInfo->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get());
   agentInfo->setAgentIdentificationProvider(configuration_);
   agentInfo->setConfigurationReader([this](const std::string& key){
-    return configuration_->getString(key);
+    return configuration_->getRawValue(key);
   });
   agentInfo->setStateMonitor(this);
   agentInfo->includeAgentStatus(false);
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index a4f7dd887..5eff01af9 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -117,7 +117,7 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
       }
       if (agent_node != nullptr) {
         agent_node->setConfigurationReader([this](const std::string& key){
-          return configuration_->getString(key);
+          return configuration_->getRawValue(key);
         });
       }
       auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());


[nifi-minifi-cpp] 03/03: MINIFICPP-1808 Improve RawSocketProtocol authorization error logging

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 863f5d85d81e88c1a46737d7f2fcbd4d62a1d748
Author: Martin Zink <ma...@apache.org>
AuthorDate: Thu Apr 21 12:01:45 2022 +0200

    MINIFICPP-1808 Improve RawSocketProtocol authorization error logging
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This close #1314
---
 libminifi/src/sitetosite/RawSocketProtocol.cpp | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 28f037b89..ebe7230b3 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -310,7 +310,9 @@ bool RawSiteToSiteClient::handShake() {
     }
   }
 
-  std::string error;
+  auto logPortStateError = [this](const std::string& error) {
+    logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
+  };
 
   switch (code) {
     case PROPERTIES_OK:
@@ -318,23 +320,21 @@ bool RawSiteToSiteClient::handShake() {
       peer_state_ = HANDSHAKED;
       return true;
     case PORT_NOT_IN_VALID_STATE:
-      error = "in invalid state";
-      break;
+      logPortStateError("in invalid state");
+      return false;
     case UNKNOWN_PORT:
-      error = "an unknown port";
-      break;
+      logPortStateError("an unknown port");
+      return false;
     case PORTS_DESTINATION_FULL:
-      error = "full";
-      break;
-    // Unknown error
+      logPortStateError("full");
+      return false;
+    case UNAUTHORIZED:
+      logger_->log_error("Site2Site HandShake on port %s failed: UNAUTHORIZED", port_id_.to_string());
+      return false;
     default:
-      logger_->log_error("HandShake Failed because of unknown respond code %d", code);
+      logger_->log_error("Site2Site HandShake on port %s failed: unknown respond code %d", port_id_.to_string(), code);
       return false;
   }
-
-  // All known error cases handled here
-  logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
-  return false;
 }
 
 void RawSiteToSiteClient::tearDown() {