You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/03/25 12:58:24 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c0e86a  MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash
1c0e86a is described below

commit 1c0e86a40a67e32241ae776339dbf3f861607568
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Fri Mar 25 13:08:28 2022 +0100

    MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash
    
    Closes #1284
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 .../tests/C2ClearCoreComponentStateTest.cpp        | 13 ++--
 .../tests/C2DescribeCoreComponentStateTest.cpp     | 13 ++--
 extensions/http-curl/tests/VerifyInvokeHTTP.h      | 19 +++---
 .../tests/integration/TailFileTest.cpp             |  6 +-
 libminifi/include/FlowController.h                 | 17 +++--
 libminifi/include/c2/C2Client.h                    |  1 +
 libminifi/include/core/state/UpdateController.h    | 15 ++---
 .../include/core/state/nodes/AgentInformation.h    | 12 ++--
 .../include/core/state/nodes/FlowInformation.h     | 22 ++++---
 libminifi/src/FlowController.cpp                   | 74 +++++++++++++---------
 libminifi/src/c2/C2Agent.cpp                       | 31 +++++----
 libminifi/src/c2/C2Client.cpp                      | 21 +++++-
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 28 ++++----
 .../src/core/state/nodes/SupportedOperations.cpp   |  6 +-
 .../integration/OnScheduleErrorHandlingTests.cpp   | 24 +++++--
 .../integration/StateTransactionalityTests.cpp     | 29 ++++++---
 libminifi/test/pcap-tests/PcapTest.cpp             |  7 +-
 libminifi/test/unit/ControllerTests.cpp            | 11 ++--
 18 files changed, 208 insertions(+), 141 deletions(-)

diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index 37719e8..b85f5da 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -51,10 +51,15 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
 
  protected:
   void updateProperties(minifi::FlowController& flow_controller) override {
-    dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile1")[0])
-        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
-    dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile2")[0])
-        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+    auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
+      auto* processor = dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+      processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+    };
+
+    flow_controller.executeOnComponent("TailFile1",
+      [&](minifi::state::StateController& component) {setFileName(test_file_1_, component);});
+    flow_controller.executeOnComponent("TailFile2",
+      [&](minifi::state::StateController& component) {setFileName(test_file_2_, component);});
   }
 
   TestController testController;
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
index 77a3ffe..cf2f619 100644
--- a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -43,10 +43,15 @@ class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
 
  protected:
   void updateProperties(minifi::FlowController& flow_controller) override {
-    dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile1")[0])
-        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
-    dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile2")[0])
-        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+    auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
+      auto* processor = dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+      processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+    };
+
+    flow_controller.executeOnComponent("TailFile1",
+      [&](minifi::state::StateController& component) {setFileName(test_file_1_, component);});
+    flow_controller.executeOnComponent("TailFile2",
+      [&](minifi::state::StateController& component) {setFileName(test_file_2_, component);});
   }
 
   TestController testController;
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index c4552f9..92687ea 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -64,15 +64,16 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
   }
 
   void setProperty(const std::string& property, const std::string& value) {
-    const auto components = flowController_->getComponents("InvokeHTTP");
-    assert(!components.empty());
-
-    const auto stateController = components[0];
-    assert(stateController);
-    const auto processorController = dynamic_cast<minifi::state::ProcessorController*>(stateController);
-    assert(processorController);
-    auto proc = processorController->getProcessor();
-    proc->setProperty(property, value);
+    bool executed = false;
+    flowController_->executeOnComponent("InvokeHTTP", [&](minifi::state::StateController& component) {
+      const auto processorController = dynamic_cast<minifi::state::ProcessorController*>(&component);
+      assert(processorController);
+      auto proc = processorController->getProcessor();
+      proc->setProperty(property, value);
+      executed = true;
+    });
+
+    assert(executed);
   }
 
   virtual void setupFlow(const std::optional<std::string>& flow_yml_path) {
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index d99e117..42e1a7f 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -71,13 +71,13 @@ class TailFileTestHarness : public IntegrationBase {
 
  protected:
   void updateProperties(minifi::FlowController& fc) override {
-    for (auto comp : fc.getComponents("tf")) {
-      auto proc = dynamic_cast<minifi::state::ProcessorController*>(comp);
+    fc.executeOnComponent("tf", [this] (minifi::state::StateController& component) {
+      auto proc = dynamic_cast<minifi::state::ProcessorController*>(&component);
       if (nullptr != proc) {
         proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
         proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
       }
-    }
+    });
   }
 
   std::string statefile;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 2a3b10c..0d5f6d7 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -114,9 +114,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
     return -1;
   }
 
-  std::vector<state::StateController*> getComponents(const std::string &name) override;
-
-  std::vector<state::StateController*> getAllComponents() override;
+  void executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) override;
+  void executeOnAllComponents(std::function<void(state::StateController&)> func) override;
 
   int16_t clearConnection(const std::string &connection) override;
 
@@ -234,11 +233,15 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   std::chrono::steady_clock::time_point start_time_;
 
  private:
-  void getProcessorController(const std::string& name, std::vector<state::StateController*>& controllerVec,
-                              const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
+  std::vector<state::StateController*> getAllComponents();
+
+  state::StateController* getComponent(const std::string &name);
+
+  state::StateController* getProcessorController(const std::string& name,
+                                                 const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
 
-  void getAllProcessorControllers(std::vector<state::StateController*>& controllerVec,
-                                  const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
+  std::vector<state::StateController*> getAllProcessorControllers(
+          const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
 
   std::unique_ptr<state::ProcessorController> createController(core::Processor& processor);
 
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 068ec30..14d4dcd 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -61,6 +61,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
  protected:
   bool isC2Enabled() const;
   std::optional<std::string> fetchFlow(const std::string& uri) const;
+  void updateResponseNodeConnections();
 
  private:
   void initializeComponentMetrics();
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 8364140..a4c9ad5 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -139,18 +139,13 @@ class StateController : public Pausable {
  */
 class StateMonitor : public StateController {
  public:
-  virtual ~StateMonitor() = default;
+  ~StateMonitor() override = default;
 
-  std::atomic<bool> &isStateMonitorRunning() {
-    return controller_running_;
-  }
-
-  virtual std::vector<StateController*> getComponents(const std::string &name) = 0;
+  // Execute callback func on the named component. Thread safe, locking mutex_, preventing concurrent flow update
+  virtual void executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) = 0;
 
-  virtual std::vector<StateController*> getAllComponents() = 0;
-  /**
-   * Operational controllers
-   */
+  // Execute callback func on the all components. Thread safe, locking mutex_, preventing concurrent flow update
+  virtual void executeOnAllComponents(std::function<void(state::StateController&)> func) = 0;
 
   /**
    * Drain repositories
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 49c3129..904ada1 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -502,24 +502,22 @@ class AgentStatus : public StateMonitorNode {
     SerializedResponseNode components_node(false);
     components_node.name = "components";
     if (monitor_ != nullptr) {
-      auto components = monitor_->getAllComponents();
-
-      for (const auto& component : components) {
+      monitor_->executeOnAllComponents([&components_node](StateController& component){
         SerializedResponseNode component_node(false);
-        component_node.name = component->getComponentName();
+        component_node.name = component.getComponentName();
 
         SerializedResponseNode uuid_node;
         uuid_node.name = "uuid";
-        uuid_node.value = std::string{component->getComponentUUID().to_string()};
+        uuid_node.value = std::string{component.getComponentUUID().to_string()};
 
         SerializedResponseNode component_status_node;
         component_status_node.name = "running";
-        component_status_node.value = component->isRunning();
+        component_status_node.value = component.isRunning();
 
         component_node.children.push_back(component_status_node);
         component_node.children.push_back(uuid_node);
         components_node.children.push_back(component_node);
-      }
+      });
     }
     return components_node;
   }
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index 13bc5f8..b10e504 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -141,15 +141,20 @@ class FlowMonitor : public StateMonitorNode {
       : StateMonitorNode(name) {
   }
 
-  void addConnection(minifi::Connection* connection) {
+  void updateConnection(minifi::Connection* connection) {
     if (nullptr != connection) {
-      connections_.insert(std::make_pair(connection->getUUIDStr(), connection));
+      connections_[connection->getUUIDStr()] = connection;
     }
   }
 
+  void clearConnections() {
+    connections_.clear();
+  }
+
   void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
-    flow_version_ = flow_version;
+    flow_version_ = std::move(flow_version);
   }
+
  protected:
   std::shared_ptr<state::response::FlowVersion> flow_version_;
   std::map<std::string, minifi::Connection*> connections_;
@@ -228,26 +233,25 @@ class FlowInformation : public FlowMonitor {
     }
 
     if (nullptr != monitor_) {
-      auto components = monitor_->getAllComponents();
       SerializedResponseNode componentsNode(false);
       componentsNode.name = "components";
 
-      for (auto component : components) {
+      monitor_->executeOnAllComponents([&componentsNode](StateController& component){
         SerializedResponseNode componentNode(false);
-        componentNode.name = component->getComponentName();
+        componentNode.name = component.getComponentName();
 
         SerializedResponseNode uuidNode;
         uuidNode.name = "uuid";
-        uuidNode.value = std::string{component->getComponentUUID().to_string()};
+        uuidNode.value = std::string{component.getComponentUUID().to_string()};
 
         SerializedResponseNode componentStatusNode;
         componentStatusNode.name = "running";
-        componentStatusNode.value = component->isRunning();
+        componentStatusNode.value = component.isRunning();
 
         componentNode.children.push_back(componentStatusNode);
         componentNode.children.push_back(uuidNode);
         componentsNode.children.push_back(componentNode);
-      }
+      });
       serialized.push_back(componentsNode);
     }
 
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 516ff8f..f518572 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -17,7 +17,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <time.h>
 #include <vector>
 #include <map>
 #include <chrono>
@@ -29,18 +28,12 @@
 
 #include "FlowController.h"
 #include "core/state/nodes/AgentInformation.h"
-#include "core/state/nodes/BuildInformation.h"
-#include "core/state/nodes/DeviceInformation.h"
 #include "core/state/nodes/FlowInformation.h"
-#include "core/state/nodes/ProcessMetrics.h"
-#include "core/state/nodes/QueueMetrics.h"
 #include "core/state/nodes/RepositoryMetrics.h"
-#include "core/state/nodes/SystemMetrics.h"
 #include "core/state/ProcessorController.h"
 #include "c2/C2Agent.h"
 #include "core/ProcessGroup.h"
 #include "core/Core.h"
-#include "core/ClassLoader.h"
 #include "SchedulingAgent.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ForwardingControllerServiceProvider.h"
@@ -50,7 +43,6 @@
 #include "utils/file/FileSystem.h"
 #include "utils/HTTPClient.h"
 #include "io/NetworkPrioritizer.h"
-#include "io/validation.h"
 #include "io/FileStream.h"
 
 namespace org {
@@ -134,6 +126,8 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
   controller_map_->clear();
   auto prevRoot = std::move(this->root_);
   this->root_ = std::move(newRoot);
+  processor_to_controller_.clear();
+  updateResponseNodeConnections();
   initialized_ = false;
   bool started = false;
   try {
@@ -191,7 +185,7 @@ int16_t FlowController::stop() {
     thread_pool_.shutdown();
     /* STOP! Before you change it, consider the following:
      * -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
-     * -They only guarantee that the processors are not scheduled any more
+     * -They only guarantee that the processors are not scheduled anymore
      * -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */
     if (this->root_) {
       this->root_->drainConnections();
@@ -284,6 +278,8 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
     if (root) {
       logger_->log_info("Load Flow Controller from provided root");
       this->root_ = std::move(root);
+      processor_to_controller_.clear();
+      updateResponseNodeConnections();
     } else {
       logger_->log_info("Instantiating new flow");
       this->root_ = loadInitialFlow();
@@ -441,30 +437,44 @@ std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest(
   return agentInfo;
 }
 
+void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  for (auto* component: getAllComponents()) {
+    func(*component);
+  }
+}
+
+void FlowController::executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (auto* component = getComponent(name); component != nullptr) {
+    func(*component);
+  } else {
+    logger_->log_error("Could not get execute requested callback for component \"%s\", because component was not found", name);
+  }
+}
+
 std::vector<state::StateController*> FlowController::getAllComponents() {
-  std::vector<state::StateController*> vec{this};
   if (root_) {
     auto controllerFactory = [this] (core::Processor& p) {
       return createController(p);
     };
-    getAllProcessorControllers(vec, controllerFactory);
+    return getAllProcessorControllers(controllerFactory);
   }
-  return vec;
-}
 
-std::vector<state::StateController*> FlowController::getComponents(const std::string& name) {
-  std::vector<state::StateController*> vec;
+  return {this};
+}
 
+state::StateController* FlowController::getComponent(const std::string& name) {
   if (name == "FlowController") {
-    vec.push_back(this);
+    return this;
   } else if (root_) {
     auto controllerFactory = [this] (core::Processor& p) {
       return createController(p);
     };
-    getProcessorController(name, vec, controllerFactory);
+    return getProcessorController(name, controllerFactory);
   }
 
-  return vec;
+  return nullptr;
 }
 
 std::unique_ptr<state::ProcessorController> FlowController::createController(core::Processor& processor) {
@@ -509,31 +519,37 @@ std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebug
   return debug_info;
 }
 
-void FlowController::getAllProcessorControllers(std::vector<state::StateController*>& controllerVec,
-                                              const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-
+std::vector<state::StateController*> FlowController::getAllProcessorControllers(
+        const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
+  std::vector<state::StateController*> controllerVec{this};
   std::vector<core::Processor*> processorVec;
   root_->getAllProcessors(processorVec);
 
   for (const auto& processor : processorVec) {
-    // find controller for processor, if it doesn't exist, create one
+    // reference to the existing or newly created controller
     auto& controller = processor_to_controller_[processor->getUUID()];
     if (!controller) {
       controller = controllerFactory(*processor);
     }
     controllerVec.push_back(controller.get());
   }
+
+  return controllerVec;
 }
 
-void FlowController::getProcessorController(const std::string& name, std::vector<state::StateController*>& controllerVec,
-                                          const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
+state::StateController* FlowController::getProcessorController(const std::string& name, const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
   auto* processor = root_->findProcessorByName(name);
-  auto& controller = processor_to_controller_[processor->getUUID()];
-  if (!controller) {
-    controller = controllerFactory(*processor);
+  if (processor == nullptr) {
+    logger_->log_error("Could not get processor controller for requested name \"%s\", because processor was not found either", name);
+    return nullptr;
+  }
+
+  // reference to the existing or newly created controller
+  auto& foundController = processor_to_controller_[processor->getUUID()];
+  if (!foundController) {
+    foundController = controllerFactory(*processor);
   }
-  controllerVec.push_back(controller.get());
+  return foundController.get();
 }
 
 }  // namespace minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index c39bebc..b7cd0e4 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -358,15 +358,15 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
         raise(SIGTERM);
       }
 
-      const auto components = update_sink_->getComponents(resp.name);
       // stop all referenced components.
-      for (auto &component : components) {
-        logger_->log_debug("Stopping component %s", component->getComponentName());
-        if (resp.op == Operation::STOP)
-          component->stop();
-        else
-          component->start();
-      }
+      update_sink_->executeOnComponent(resp.name, [this, &resp] (state::StateController& component) {
+        logger_->log_debug("Stopping component %s", component.getComponentName());
+        if (resp.op == Operation::STOP) {
+          component.stop();
+        } else {
+          component.start();
+        }
+      });
 
       if (resp.ident.length() > 0) {
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
@@ -452,21 +452,20 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
     }
     case ClearOperand::CORECOMPONENTSTATE: {
       for (const auto& corecomponent : resp.operation_arguments) {
-        std::vector<state::StateController*> components = update_sink_->getComponents(corecomponent.second.to_string());
         auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
         if (state_manager_provider != nullptr) {
-          for (auto* component : components) {
-            logger_->log_debug("Clearing state for component %s", component->getComponentName());
-            auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
+          update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_manager_provider] (state::StateController& component) {
+            logger_->log_debug("Clearing state for component %s", component.getComponentName());
+            auto state_manager = state_manager_provider->getCoreComponentStateManager(component.getComponentUUID());
             if (state_manager != nullptr) {
-              component->stop();
+              component.stop();
               state_manager->clear();
               state_manager->persist();
-              component->start();
+              component.start();
             } else {
-              logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID().to_string());
+              logger_->log_warn("Failed to get StateManager for component %s", component.getComponentUUID().to_string());
             }
-          }
+          });
         } else {
           logger_->log_error("Failed to get StateManagerProvider");
         }
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index a4d51fc..3a195ef 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -122,7 +122,7 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
       auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
       if (flowMonitor != nullptr) {
         for (auto &con : connections) {
-          flowMonitor->addConnection(con.second);
+          flowMonitor->updateConnection(con.second);
         }
         flowMonitor->setStateMonitor(update_sink);
         flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
@@ -325,6 +325,7 @@ std::vector<std::shared_ptr<state::response::ResponseNode>> C2Client::getHeartbe
   const bool include = include_manifest || fullHb == "true";
 
   std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+  nodes.reserve(root_response_nodes_.size());
   std::lock_guard<std::mutex> lock(metrics_mutex_);
   for (const auto &entry : root_response_nodes_) {
     auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(entry.second);
@@ -336,6 +337,24 @@ std::vector<std::shared_ptr<state::response::ResponseNode>> C2Client::getHeartbe
   return nodes;
 }
 
+void C2Client::updateResponseNodeConnections() {
+  std::map<std::string, Connection*> connections;
+  if (root_ != nullptr) {
+    root_->getConnections(connections);
+  }
+
+  std::lock_guard<std::mutex> lock(metrics_mutex_);
+  for (auto& [_, responseNode] : root_response_nodes_) {
+    auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(responseNode.get());
+    if (flowMonitor != nullptr) {
+      flowMonitor->clearConnections();
+      for (const auto &con: connections) {
+        flowMonitor->updateConnection(con.second);
+      }
+    }
+  }
+}
+
 }  // namespace c2
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 0afa8e0..4949893 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -97,10 +97,9 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           std::string componentStr;
           const auto size = stream->read(componentStr);
           if (!io::isError(size)) {
-            auto components = update_sink_->getComponents(componentStr);
-            for (const auto& component : components) {
-              component->start();
-            }
+            update_sink_->executeOnComponent(componentStr, [](state::StateController& component) {
+              component.start();
+            });
           } else {
             logger_->log_debug("Connection broke");
           }
@@ -111,10 +110,9 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           std::string componentStr;
           const auto size = stream->read(componentStr);
           if (!io::isError(size)) {
-            auto components = update_sink_->getComponents(componentStr);
-            for (const auto& component : components) {
-              component->stop();
-            }
+            update_sink_->executeOnComponent(componentStr, [](state::StateController& component) {
+              component.stop();
+            });
           } else {
             logger_->log_debug("Connection broke");
           }
@@ -180,14 +178,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             resp.write(response.str());
             stream->write(resp.getBuffer());
           } else if (what == "components") {
+            std::vector<std::pair<std::string, bool>> components;
+            update_sink_->executeOnAllComponents([&components](state::StateController& component){
+              components.emplace_back(component.getComponentName(), component.isRunning());
+            });
             io::BufferStream resp;
             resp.write(&head, 1);
-            const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
-            resp.write(size_);
-            for (const auto &component : update_sink_->getAllComponents()) {
-              resp.write(component->getComponentName());
-              resp.write(component->isRunning() ? "true" : "false");
+            resp.write(gsl::narrow<uint16_t>(components.size()));
+            for (const auto& [name, isRunning] : components) {
+              resp.write(name);
+              resp.write(isRunning ? "true" : "false");
             }
+
             stream->write(resp.getBuffer());
           } else if (what == "jstack") {
             io::BufferStream resp;
diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp
index 8c893c7..e7ff1f6 100644
--- a/libminifi/src/core/state/nodes/SupportedOperations.cpp
+++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp
@@ -107,9 +107,9 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min
     case minifi::c2::Operation::STOP: {
       addProperty(properties, "c2");
       if (monitor_) {
-        for (const auto& component: monitor_->getAllComponents()) {
-          addProperty(properties, component->getComponentName());
-        }
+        monitor_->executeOnAllComponents([&properties](StateController& component){
+          addProperty(properties, component.getComponentName());
+        });
       }
       break;
     }
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 575cf71..3b79fce 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -76,21 +76,31 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
 class EventDriverScheduleErrorHandlingTests: public IntegrationBase {
  public:
   void updateProperties(minifi::FlowController& fc) override {
-    auto controller_vec = fc.getAllComponents();
     /* This tests depends on a configuration that contains only one KamikazeProcessor named kamikaze
      * (See testOnScheduleRetry.yml)
      * In this case there are two components in the flowcontroller: first is the controller itself,
      * second is the processor that the test uses.
      * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
      * that most probably means a breaking change. */
-    assert(controller_vec.size() == 2);
-    assert(controller_vec[0]->getComponentName() == "FlowController");
-    assert(controller_vec[1]->getComponentName() == "kamikaze");
+    size_t controllerVecIdx = 0;
 
-    auto process_controller = dynamic_cast<org::apache::nifi::minifi::state::ProcessorController*>(controller_vec[1]);
-    assert(process_controller != nullptr);
+    fc.executeOnAllComponents([&controllerVecIdx](org::apache::nifi::minifi::state::StateController& component){
+      if (controllerVecIdx == 0) {
+        assert(component.getComponentName() == "FlowController");
+      } else if (controllerVecIdx == 1) {
+        assert(component.getComponentName() == "kamikaze");
 
-    process_controller->getProcessor()->setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
+        auto process_controller = dynamic_cast<org::apache::nifi::minifi::state::ProcessorController*>(&component);
+        assert(process_controller != nullptr);
+
+        process_controller->getProcessor()->setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
+      }
+
+      ++controllerVecIdx;
+    });
+
+    // check controller vector size
+    assert(controllerVecIdx == 2);
   }
 
   void runAssertions() override {
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp b/libminifi/test/integration/StateTransactionalityTests.cpp
index 66671dd..18bba7c 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -56,23 +56,32 @@ class StatefulIntegrationTest : public IntegrationBase {
   }
 
   void updateProperties(minifi::FlowController& fc) override {
-    const auto controllerVec = fc.getAllComponents();
     /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
      * (See TestStateTransactionality.yml)
      * In this case there are two components in the flowcontroller: first is the controller itself,
      * second is the processor that the test uses.
      * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
      * that most probably means a breaking change. */
-    assert(controllerVec.size() == 2);
-    assert(controllerVec[0]->getComponentName() == "FlowController");
-    assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+    size_t controllerVecIdx = 0;
 
-    // set hooks
-    const auto processController = dynamic_cast<ProcessorController*>(controllerVec[1]);
-    assert(processController != nullptr);
-    stateful_processor_ = dynamic_cast<StatefulProcessor*>(processController->getProcessor());
-    assert(stateful_processor_ != nullptr);
-    stateful_processor_->setHooks(on_schedule_hook_, on_trigger_hooks_);
+    fc.executeOnAllComponents([this, &controllerVecIdx](org::apache::nifi::minifi::state::StateController& component){
+      if (controllerVecIdx == 0) {
+        assert(component.getComponentName() == "FlowController");
+      } else if (controllerVecIdx == 1) {
+        assert(component.getComponentName() == "statefulProcessor");
+        // set hooks
+        const auto processController = dynamic_cast<ProcessorController*>(&component);
+        assert(processController != nullptr);
+        stateful_processor_ = dynamic_cast<StatefulProcessor*>(processController->getProcessor());
+        assert(stateful_processor_ != nullptr);
+        stateful_processor_->setHooks(on_schedule_hook_, on_trigger_hooks_);
+      }
+
+      ++controllerVecIdx;
+    });
+
+    // check controller vector size
+    assert(controllerVecIdx == 2);
   }
 
   void runAssertions() override {
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp b/libminifi/test/pcap-tests/PcapTest.cpp
index 236e0eb..4e408f4 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -78,15 +78,14 @@ class PcapTestHarness : public IntegrationBase {
   }
 
   void updateProperties(minifi::FlowController& fc) override {
-    auto components = fc.getComponents("pcap");
-    for (const auto& component : components) {
-      auto proccontroller = dynamic_cast<minifi::state::ProcessorController*>(component);
+    fc.executeOnComponent("pcap", [this] (minifi::state::StateController& component) {
+      auto proccontroller = dynamic_cast<minifi::state::ProcessorController*>(&component);
       if (proccontroller) {
         auto processor = proccontroller->getProcessor();
         processor->setProperty(minifi::processors::CapturePacket::BaseDir.getName(), dir);
         processor->setProperty(minifi::processors::CapturePacket::NetworkControllers.getName(), ".*");
       }
-    }
+    });
   }
 
  protected:
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index bd7cf9b..0ab8877 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -79,15 +79,16 @@ class TestUpdateSink : public minifi::state::StateMonitor {
   explicit TestUpdateSink(std::shared_ptr<StateController> controller)
       : is_running(true),
         clear_calls(0),
-        controller(controller),
+        controller(std::move(controller)),
         update_calls(0) {
   }
-  std::vector<StateController*> getComponents(const std::string& /*name*/) override {
-    return std::vector<StateController*>{ controller.get() };
+
+  void executeOnComponent(const std::string&, std::function<void(minifi::state::StateController&)> func) override {
+    func(*controller);
   }
 
-  std::vector<StateController*> getAllComponents() override {
-    return std::vector<StateController*>{ controller.get() };
+  void executeOnAllComponents(std::function<void(minifi::state::StateController&)> func) override {
+    func(*controller);
   }
 
   std::string getComponentName() const override {