You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 17:09:07 UTC

[nifi-minifi-cpp] branch main updated (00d145e08 -> 8d18ef1ff)

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 00d145e08 MINIFICPP-1972 - Refactor State Manager code
     new 6c57decc7 MINIFICPP-1991 - Remove unused ControllerServiceProvider methods
     new 8d18ef1ff MINIFICPP-2026 Make isRunning member functions const

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aws/controllerservices/AWSCredentialsService.h |   2 +-
 .../AzureStorageCredentialsService.h               |   2 +-
 extensions/coap/controllerservice/CoapConnector.h  |   2 +-
 extensions/coap/server/CoapServer.h                |   6 +-
 .../ElasticsearchCredentialsControllerService.h    |   2 +-
 .../GCPCredentialsControllerService.h              |   2 +-
 extensions/http-curl/client/HTTPClient.h           |   2 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |  43 +++----
 extensions/jni/ExecuteJavaControllerService.h      |   2 +-
 extensions/jni/jvm/JavaControllerService.h         |   2 +-
 extensions/libarchive/FocusArchiveEntry.h          |   1 -
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +-
 extensions/sql/services/DatabaseService.h          |   2 +-
 libminifi/include/Connection.h                     |   2 +-
 libminifi/include/FlowController.h                 |  39 ++----
 libminifi/include/SchedulingAgent.h                |  17 +--
 libminifi/include/c2/C2Protocol.h                  |   6 +-
 libminifi/include/c2/HeartbeatReporter.h           |   2 +-
 libminifi/include/c2/triggers/FileUpdateTrigger.h  |   2 +-
 .../include/controllers/AttributeProviderService.h |   2 +-
 .../controllers/LinuxPowerManagementService.h      |   2 +-
 .../controllers/NetworkPrioritizerService.h        |   2 +-
 libminifi/include/controllers/SSLContextService.h  |   2 +-
 .../include/controllers/ThreadManagementService.h  |  10 +-
 .../controllers/UpdatePolicyControllerService.h    |   2 +-
 .../controllers/keyvalue/KeyValueStateStorage.h    |   2 +-
 libminifi/include/core/Connectable.h               |   2 +-
 libminifi/include/core/ProcessContext.h            |   2 +-
 libminifi/include/core/ProcessGroup.h              |  14 +--
 libminifi/include/core/Processor.h                 |   2 +-
 libminifi/include/core/ProcessorNode.h             |   2 +-
 libminifi/include/core/SerializableComponent.h     |   2 +-
 libminifi/include/core/ThreadedRepository.h        |   2 +-
 .../include/core/controller/ControllerService.h    |   4 +-
 .../core/controller/ControllerServiceProvider.h    |  88 +-------------
 .../ForwardingControllerServiceProvider.h          |  62 +---------
 .../controller/StandardControllerServiceProvider.h | 131 +++------------------
 libminifi/include/core/state/ProcessorController.h |   6 +-
 libminifi/include/core/state/UpdateController.h    |   2 +-
 libminifi/include/core/state/nodes/MetricsBase.h   |   2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |   2 +-
 libminifi/include/utils/ThreadPool.h               |  23 ++--
 libminifi/src/FlowController.cpp                   |  29 ++---
 libminifi/src/SchedulingAgent.cpp                  |  41 -------
 .../controllers/LinuxPowerManagementService.cpp    |   2 +-
 .../src/controllers/NetworkPrioritizerService.cpp  |   2 +-
 .../controllers/UpdatePolicyControllerService.cpp  |   2 +-
 libminifi/src/core/FlowConfiguration.cpp           |   4 +-
 libminifi/src/core/ProcessGroup.cpp                |  28 ++---
 libminifi/src/core/Processor.cpp                   |   2 +-
 libminifi/src/core/ProcessorNode.cpp               |   2 +-
 libminifi/src/core/state/ProcessorController.cpp   |   6 +-
 libminifi/src/utils/ThreadPool.cpp                 |  41 +++++--
 libminifi/test/TestBase.cpp                        |   2 +-
 libminifi/test/unit/ComponentManifestTests.cpp     |   2 +-
 libminifi/test/unit/ControllerTests.cpp            |   4 +-
 libminifi/test/unit/MockClasses.h                  |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |   2 +-
 libminifi/test/unit/SchedulingAgentTests.cpp       |   2 +-
 59 files changed, 195 insertions(+), 484 deletions(-)


[nifi-minifi-cpp] 01/02: MINIFICPP-1991 - Remove unused ControllerServiceProvider methods

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6c57decc7ce380af863dd621eb457f958b6698b0
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Tue Nov 22 14:04:02 2022 +0100

    MINIFICPP-1991 - Remove unused ControllerServiceProvider methods
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1458
---
 .../tests/ControllerServiceIntegrationTests.cpp    |  43 +++----
 libminifi/include/FlowController.h                 |  37 ++----
 libminifi/include/SchedulingAgent.h                |  17 +--
 libminifi/include/core/ProcessContext.h            |   2 +-
 libminifi/include/core/ProcessGroup.h              |  14 +--
 .../core/controller/ControllerServiceProvider.h    |  88 +-------------
 .../ForwardingControllerServiceProvider.h          |  62 +---------
 .../controller/StandardControllerServiceProvider.h | 131 +++------------------
 libminifi/include/core/state/ProcessorController.h |   4 +-
 libminifi/include/utils/ThreadPool.h               |  23 ++--
 libminifi/src/FlowController.cpp                   |  29 ++---
 libminifi/src/SchedulingAgent.cpp                  |  41 -------
 libminifi/src/core/FlowConfiguration.cpp           |   4 +-
 libminifi/src/core/ProcessGroup.cpp                |  28 ++---
 libminifi/src/core/state/ProcessorController.cpp   |   4 +-
 libminifi/src/utils/ThreadPool.cpp                 |  41 +++++--
 libminifi/test/TestBase.cpp                        |   2 +-
 libminifi/test/unit/SchedulingAgentTests.cpp       |   2 +-
 18 files changed, 142 insertions(+), 430 deletions(-)

diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index ccd910b1b..9e92c8270 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -83,7 +83,7 @@ int main(int argc, char **argv) {
 
   auto pg = yaml_config.getRoot();
 
-  auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg.get(), std::make_shared<minifi::Configure>());
+  auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, std::make_shared<minifi::Configure>());
   std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
   assert(mockNode != nullptr);
   mockNode->enable();
@@ -108,27 +108,28 @@ int main(int argc, char **argv) {
   assert(!ssl_client->getCACertificate().empty());
   // now let's disable one of the controller services.
   std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
-  const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
   assert(cs_id != nullptr);
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->enableControllerService(cs_id);
-    disabled = false;
-  }
-  std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->disableReferencingServices(mock_cont);
-    disabled = true;
-  }
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->enableReferencingServices(mock_cont);
-    disabled = false;
-  }
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+  // TODO(adebreceni): MINIFICPP-1992
+//  const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->enableControllerService(cs_id);
+//    disabled = false;
+//  }
+//  std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->disableReferencingServices(mock_cont);
+//    disabled = true;
+//  }
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->enableReferencingServices(mock_cont);
+//    disabled = false;
+//  }
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
 
   controller->waitUnload(60000);
   return 0;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 97710c0d0..b398191d1 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -65,7 +65,6 @@ namespace state {
 class ProcessorController;
 }  // namespace state
 
-// Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""
 
 /**
@@ -87,28 +86,24 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
 
   ~FlowController() override;
 
-  // Get the provenance repository
   virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
     return this->provenance_repo_;
   }
 
-  // Load flow xml from disk, after that, create the root process group and its children, initialize the flows
   virtual void load(std::unique_ptr<core::ProcessGroup> root = nullptr, bool reload = false);
 
-  // Whether the Flow Controller is start running
   bool isRunning() override {
     return running_.load() || updating_.load();
   }
 
-  // Whether the Flow Controller has already been initialized (loaded flow XML)
   virtual bool isInitialized() {
     return initialized_.load();
   }
-  // Start to run the Flow Controller which internally start the root process group and all its children
+  // Start the Flow Controller which internally starts the root process group and all its children
   int16_t start() override;
   int16_t pause() override;
   int16_t resume() override;
-  // Unload the current flow YAML, clean the root process group and all its children
+  // Unload the current flow, clean the root process group and all its children
   int16_t stop() override;
   int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override;
   int16_t drainRepositories() override {
@@ -123,31 +118,27 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; }
   // Asynchronous function trigger unloading and wait for a period of time
   virtual void waitUnload(uint64_t timeToWaitMs);
-  // Unload the current flow xml, clean the root process group and all its children
+  // Unload the current flow, clean the root process group and all its children
   virtual void unload();
-  // update property value
   void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
     if (root_ != nullptr)
       root_->updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue));
   }
 
-  // set SerialNumber
   void setSerialNumber(std::string number) {
     serial_number_ = std::move(number);
   }
 
-  // get serial number as string
   std::string getSerialNumber() {
     return serial_number_;
   }
 
-  // validate and apply passing yaml configuration payload
+  // validate and apply passing configuration payload
   // first it will validate the payload with the current root node config for flowController
   // like FlowController id/name is the same and new version is greater than the current version
   // after that, it will apply the configuration
   bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt);
 
-  // get name
   std::string getName() const override {
     if (root_ != nullptr)
       return root_->getName();
@@ -166,7 +157,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
     return root_->getUUID();
   }
 
-  // get version
   virtual std::string getVersion() {
     if (root_ != nullptr)
       return std::to_string(root_->getVersion());
@@ -201,38 +191,29 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   void loadMetricsPublisher();
 
  protected:
-  // function to load the flow file repo.
   void loadFlowRepo();
 
   std::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
 
  private:
   template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
-  void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) {
+  void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) {
     if (condition) {
-      scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
+      scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
     }
   }
 
  protected:
-  // flow controller mutex
   std::recursive_mutex mutex_;
 
-  // Whether it is running
   std::atomic<bool> running_;
   std::atomic<bool> updating_;
 
-  // Whether it has already been initialized (load the flow XML already)
   std::atomic<bool> initialized_;
-  // Flow Timer Scheduler
-  std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
-  // Flow Event Scheduler
-  std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
-  // Cron Schedule
-  std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
-  // FlowControl Protocol
+  std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
+  std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_;
+  std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
   std::unique_ptr<FlowControlProtocol> protocol_;
-  // metrics information
   std::chrono::steady_clock::time_point start_time_;
 
  private:
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 3ec3ad892..24992c928 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -17,8 +17,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+
+#pragma once
 
 #include <memory>
 #include <string>
@@ -47,10 +47,7 @@
 #define SCHEDULING_WATCHDOG_CHECK_PERIOD_MS 1000  // msec
 #define SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS 5000  // msec
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 // SchedulingAgent Class
 class SchedulingAgent {
@@ -104,8 +101,6 @@ class SchedulingAgent {
 
   void watchDogFunc();
 
-  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
-  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
   // schedule, overwritten by different DrivenSchedulingAgent
   virtual void schedule(core::Processor* processor) = 0;
   // unschedule, overwritten by different DrivenSchedulingAgent
@@ -161,8 +156,4 @@ class SchedulingAgent {
   std::chrono::milliseconds alert_time_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index f008e6669..0e0a9b5a1 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -194,7 +194,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
    * identifier
    */
   std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) const override {
-    return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUID());
+    return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier);
   }
 
   /**
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 96dd68438..af43f8e02 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -146,13 +146,13 @@ class ProcessGroup : public CoreComponent {
     return config_version_;
   }
 
-  void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-                       const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                       const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
+  void startProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+                       EventDrivenSchedulingAgent& eventScheduler,
+                       CronDrivenSchedulingAgent& cronScheduler);
 
-  void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-                      const std::shared_ptr<EventDrivenSchedulingAgent>& eventScheduler,
-                      const std::shared_ptr<CronDrivenSchedulingAgent>& cronScheduler,
+  void stopProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+                      EventDrivenSchedulingAgent& eventScheduler,
+                      CronDrivenSchedulingAgent& cronScheduler,
                       const std::function<bool(const Processor*)>& filter = nullptr);
 
   bool isRemoteProcessGroup();
@@ -231,7 +231,7 @@ class ProcessGroup : public CoreComponent {
   void verify() const;
 
  protected:
-  void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
+  void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);
 
   // version
   int config_version_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index bb1f826a8..55f4d0ca2 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
 
 #include <memory>
 #include <string>
@@ -78,32 +78,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
     return controller_map_->getControllerServiceNode(id);
   }
 
-  /**
-   * Removes a controller service.
-   * @param serviceNode controller service node.
-   */
-  virtual void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    controller_map_->removeControllerService(serviceNode);
-  }
-
-  /**
-   * Enables the provided controller service
-   * @param serviceNode controller service node.
-   */
-  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Enables the provided controller service nodes
-   * @param serviceNode controller service node.
-   */
-  virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0;
-
-  /**
-   * Disables the provided controller service node
-   * @param serviceNode controller service node.
-   */
-  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
   /**
    * Removes all controller services.
    */
@@ -116,62 +90,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
     return controller_map_->getAllControllerServices();
   }
 
-  /**
-   * Verifies that referencing components can be stopped for the controller service
-   */
-  virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   *  Unschedules referencing components.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Verifies referencing components for <code>serviceNode</code> can be disabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Disables referencing components for <code>serviceNode</code> can be disabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
-    return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>();
-  }
-
-  /**
-   * Verifies referencing components for <code>serviceNode</code> can be enabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      ref->canEnable();
-    }
-  }
-
-  /**
-   * Enables referencing components for <code>serviceNode</code> can be Enabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Schedules the service node and referencing components.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Returns a controller service for the service identifier and componentID
-   * @param service Identifier service identifier.
-   */
-  virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string& serviceIdentifier, const utils::Identifier& /*componentId*/) const {
-    std::shared_ptr<ControllerService> node = getControllerService(serviceIdentifier);
-    return node;
-  }
-
   /**
    * Gets the controller service for the provided identifier
    * @param identifier service identifier.
@@ -272,5 +190,3 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
 };
 
 }  // namespace org::apache::nifi::minifi::core::controller
-
-#endif  // LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
index 960307be5..d70d6d324 100644
--- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
@@ -25,12 +25,7 @@
 #include "ControllerServiceProvider.h"
 #include "ControllerServiceNode.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
 
 class ForwardingControllerServiceProvider : public ControllerServiceProvider {
  public:
@@ -44,22 +39,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
     return controller_service_provider_impl_->getControllerServiceNode(id);
   }
 
-  void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->removeControllerService(serviceNode);
-  }
-
-  std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->enableControllerService(serviceNode);
-  }
-
-  void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override {
-    return controller_service_provider_impl_->enableControllerServices(serviceNodes);
-  }
-
-  std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->disableControllerService(serviceNode);
-  }
-
   void clearControllerServices() override {
     return controller_service_provider_impl_->clearControllerServices();
   }
@@ -72,38 +51,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
     return controller_service_provider_impl_->getAllControllerServices();
   }
 
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode);
-  }
-
-  void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode);
-  }
-
-  void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode);
-  }
-
-  void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->disableReferencingServices(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->enableReferencingServices(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->scheduleReferencingComponents(serviceNode);
-  }
-
-  std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) const override {
-    return controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier, componentId);
-  }
-
   bool isControllerServiceEnabled(const std::string &identifier) override {
     return controller_service_provider_impl_->isControllerServiceEnabled(identifier);
   }
@@ -128,9 +75,4 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
   std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_;
 };
 
-}  // namespace controller
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 92b12890c..8730133fd 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
 
 #include <string>
 #include <utility>
@@ -32,32 +32,16 @@
 #include "StandardControllerServiceNode.h"
 #include "ControllerServiceProvider.h"
 #include "core/logging/LoggerFactory.h"
+#include "SchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
 
 class StandardControllerServiceProvider : public ControllerServiceProvider, public std::enable_shared_from_this<StandardControllerServiceProvider> {
  public:
-  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration,
-                                             std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader())
-      : ControllerServiceProvider(services),
-        agent_(agent),
-        extension_loader_(loader),
-        root_group_(root_group),
-        configuration_(configuration),
-        logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
-  }
-
-  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration, ClassLoader &loader =
+  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<Configure> configuration, ClassLoader &loader =
                                                  ClassLoader::getDefaultClassLoader())
       : ControllerServiceProvider(services),
-        agent_(nullptr),
         extension_loader_(loader),
-        root_group_(root_group),
         configuration_(configuration),
         logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
   }
@@ -68,14 +52,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
   StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete;
   StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete;
 
-  void setRootGroup(ProcessGroup* rg) {
-    root_group_ = rg;
-  }
-
-  void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) {
-    agent_ = agent;
-  }
-
   std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool /*firstTimeAdded*/) {
     std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
 
@@ -97,22 +73,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     return new_service_node;
   }
 
-  std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    if (serviceNode->canEnable()) {
-      return agent_->enableControllerService(serviceNode);
-    } else {
-      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
-      return no_run;
-    }
-  }
-
   virtual void enableAllControllerServices() {
     logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
-      if (service->canEnable()) {
-        logger_->log_info("Enabling %s", service->getName());
-        agent_->enableControllerService(service);
-      } else {
+      logger_->log_info("Enabling %s", service->getName());
+      if (!service->canEnable()) {
+        logger_->log_warn("Service %s cannot be enabled", service->getName());
+        continue;
+      }
+      if (!service->enable()) {
         logger_->log_warn("Could not enable %s", service->getName());
       }
     }
@@ -121,98 +90,32 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
   virtual void disableAllControllerServices() {
     logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
+      logger_->log_info("Disabling %s", service->getName());
+      if (!service->enabled()) {
+        logger_->log_warn("Service %s is not enabled", service->getName());
+        continue;
+      }
       if (!service->disable()) {
         logger_->log_warn("Could not disable %s", service->getName());
       }
     }
   }
 
-  void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
-    for (auto node : serviceNodes) {
-      enableControllerService(node);
-    }
-  }
-
-  std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
-      return agent_->disableControllerService(serviceNode);
-    } else {
-      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
-      return no_run;
-    }
-  }
-
   void clearControllerServices() {
     controller_map_->clear();
   }
 
-  void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->disableControllerService(ref);
-    }
-    return references;
-  }
-
-  void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      if (!ref->canEnable()) {
-        logger_->log_info("Cannot disable %s", ref->getName());
-      }
-    }
-  }
-
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->disableControllerService(ref);
-    }
-
-    return references;
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->enableControllerService(ref);
-    }
-    return references;
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->enableControllerService(ref);
-    }
-    return references;
-  }
-
  protected:
   bool canEdit() {
     return false;
   }
 
-  std::shared_ptr<minifi::SchedulingAgent> agent_;
-
   ClassLoader &extension_loader_;
 
-  ProcessGroup* root_group_ = nullptr;
-
   std::shared_ptr<Configure> configuration_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace controller
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+}  // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 109d5c490..e3546da2a 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -32,7 +32,7 @@ namespace org::apache::nifi::minifi::state {
  */
 class ProcessorController : public StateController {
  public:
-  ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler);
+  ProcessorController(core::Processor& processor, SchedulingAgent& scheduler);
 
   ~ProcessorController() override;
 
@@ -64,7 +64,7 @@ class ProcessorController : public StateController {
 
  protected:
   gsl::not_null<core::Processor*> processor_;
-  std::shared_ptr<SchedulingAgent> scheduler_;
+  gsl::not_null<SchedulingAgent*> scheduler_;
 };
 
 }  // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 37aa27fdd..68c2663ec 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -40,6 +40,7 @@
 #include "controllers/ThreadManagementService.h"
 #include "core/controller/ControllerService.h"
 #include "core/controller/ControllerServiceProvider.h"
+
 namespace org::apache::nifi::minifi::utils {
 
 using TaskId = std::string;
@@ -162,18 +163,8 @@ class WorkerThread {
 template<typename T>
 class ThreadPool {
  public:
-  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, core::controller::ControllerServiceProvider* controller_service_provider = nullptr,
-             std::string name = "NamelessPool")
-      : daemon_threads_(daemon_threads),
-        thread_reduction_count_(0),
-        max_worker_threads_(max_worker_threads),
-        adjust_threads_(false),
-        running_(false),
-        controller_service_provider_(controller_service_provider),
-        name_(std::move(name)) {
-    current_workers_ = 0;
-    thread_manager_ = nullptr;
-  }
+  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false,
+             core::controller::ControllerServiceProvider* controller_service_provider = nullptr, std::string name = "NamelessPool");
 
   ThreadPool(const ThreadPool<T> &other) = delete;
   ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
@@ -277,6 +268,9 @@ class ThreadPool {
       start();
   }
 
+ private:
+  std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const;
+
  protected:
   std::thread createThread(std::function<void()> &&functor) {
     return std::thread([ functor ]() mutable {
@@ -296,6 +290,7 @@ class ThreadPool {
       std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
   }
+
 // determines if threads are detached
   bool daemon_threads_;
   std::atomic<int> thread_reduction_count_;
@@ -337,6 +332,8 @@ class ThreadPool {
   // variable to signal task running completion
   std::condition_variable task_run_complete_;
 
+  std::shared_ptr<core::logging::Logger> logger_;
+
   /**
    * Call for the manager to start worker threads
    */
@@ -345,7 +342,7 @@ class ThreadPool {
   /**
    * Runs worker tasks
    */
-  void run_tasks(std::shared_ptr<WorkerThread> thread);
+  void run_tasks(const std::shared_ptr<WorkerThread>& thread);
 
   void manage_delayed_queue();
 };
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 87ea88853..9d0d62e15 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -182,7 +182,7 @@ int16_t FlowController::stop() {
     logger_->log_info("Stop Flow Controller");
     if (this->root_) {
       // stop source processors first
-      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const core::Processor* proc) -> bool {
+      this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_, [] (const core::Processor* proc) -> bool {
         return !proc->hasIncomingConnections();
       });
       // we enable C2 to progressively increase the timeout
@@ -194,7 +194,7 @@ int16_t FlowController::stop() {
         std::this_thread::sleep_for(shutdown_check_interval_);
       }
       // shutdown all other processors as well
-      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+      this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
     }
     // stop after we've attempted to stop the processors.
     timer_scheduler_->stop();
@@ -211,7 +211,7 @@ int16_t FlowController::stop() {
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // stop the ControllerServices
-    this->controller_service_provider_impl_->disableAllControllerServices();
+    disableAllControllerServices();
     running_ = false;
   }
   return 0;
@@ -324,10 +324,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
     conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload);
     conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload);
 
-    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_.get());
-    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent(
-        std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
-
     logger_->log_info("Loaded controller service provider");
 
     /*
@@ -374,16 +370,16 @@ int16_t FlowController::start() {
   } else {
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
-      controller_service_provider_impl_->enableAllControllerServices();
-      this->timer_scheduler_->start();
-      this->event_scheduler_->start();
-      this->cron_scheduler_->start();
+      enableAllControllerServices();
+      timer_scheduler_->start();
+      event_scheduler_->start();
+      cron_scheduler_->start();
 
       if (this->root_ != nullptr) {
         start_time_ = std::chrono::steady_clock::now();
         // watch out, this might immediately start the processors
         // as the thread_pool_ is started in load()
-        this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+        this->root_->startProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
       }
       C2Client::initialize(this, this, this);
       core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
@@ -391,7 +387,6 @@ int16_t FlowController::start() {
       this->protocol_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
-      thread_pool_.start();
       logger_->log_info("Started Flow Controller");
     }
     return 0;
@@ -504,11 +499,11 @@ state::StateController* FlowController::getComponent(const std::string& id_or_na
 }
 
 gsl::not_null<std::unique_ptr<state::ProcessorController>> FlowController::createController(core::Processor& processor) {
-  const auto scheduler = [this, &processor]() -> std::shared_ptr<SchedulingAgent> {
+  const auto scheduler = [this, &processor]() -> SchedulingAgent& {
     switch (processor.getSchedulingStrategy()) {
-      case core::SchedulingStrategy::TIMER_DRIVEN: return timer_scheduler_;
-      case core::SchedulingStrategy::EVENT_DRIVEN: return event_scheduler_;
-      case core::SchedulingStrategy::CRON_DRIVEN: return cron_scheduler_;
+      case core::SchedulingStrategy::TIMER_DRIVEN: return *timer_scheduler_;
+      case core::SchedulingStrategy::EVENT_DRIVEN: return *event_scheduler_;
+      case core::SchedulingStrategy::CRON_DRIVEN: return *cron_scheduler_;
     }
     gsl_Assert(false);
   };
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 5b59faa31..9dfca5c58 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -34,47 +34,6 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 
 namespace org::apache::nifi::minifi {
 
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
-  // reference the enable function from serviceNode
-  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
-      serviceNode->enable();
-      return utils::TaskRescheduleInfo::Done();
-    };
-
-  // only need to run this once.
-  auto monitor = std::make_unique<utils::ComplexMonitor>();
-  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
-  // move the functor into the thread pool. While a future is returned
-  // we aren't terribly concerned with the result.
-  std::future<utils::TaskRescheduleInfo> future;
-  thread_pool_.execute(std::move(functor), future);
-  if (future.valid())
-    future.wait();
-  return future;
-}
-
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
-  // reference the disable function from serviceNode
-  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
-    serviceNode->disable();
-    return utils::TaskRescheduleInfo::Done();
-  };
-
-  // only need to run this once.
-  auto monitor = std::make_unique<utils::ComplexMonitor>();
-  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
-
-  // move the functor into the thread pool. While a future is returned
-  // we aren't terribly concerned with the result.
-  std::future<utils::TaskRescheduleInfo> future;
-  thread_pool_.execute(std::move(functor), future);
-  if (future.valid())
-    future.wait();
-  return future;
-}
-
 bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                 const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   gsl_Expects(processor);
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index c7981b86f..b53d3fe90 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -37,7 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
       filesystem_(std::move(ctx.filesystem)),
       logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
-  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   std::string flowUrl;
   std::string bucket_id = "default";
   std::string flowId;
@@ -97,7 +97,7 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s
   auto old_services = controller_services_;
   auto old_provider = service_provider_;
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
-  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   auto payload = getRootFromPayload(yamlConfigPayload);
   if (!url.empty() && payload != nullptr) {
     std::string payload_flow_id;
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7922717fb..7300d2129 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -122,8 +122,8 @@ void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
   }
 }
 
-void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-    const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler,
+    EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler) {
   std::unique_lock<std::recursive_mutex> lock(mutex_);
 
   std::set<Processor*> failed_processors;
@@ -133,13 +133,13 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
       logger_->log_debug("Starting %s", processor->getName());
       switch (processor->getSchedulingStrategy()) {
         case TIMER_DRIVEN:
-          timeScheduler->schedule(processor);
+          timeScheduler.schedule(processor);
           break;
         case EVENT_DRIVEN:
-          eventScheduler->schedule(processor);
+          eventScheduler.schedule(processor);
           break;
         case CRON_DRIVEN:
-          cronScheduler->schedule(processor);
+          cronScheduler.schedule(processor);
           break;
       }
     }
@@ -166,8 +166,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
 
   if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) {
     logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load());
-    auto func = [this, eventScheduler, cronScheduler, timeScheduler]() {
-      this->startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
+    auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() {
+      this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler);
     };
     onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_), func);
     onScheduleTimer_->start();
@@ -176,8 +176,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
   }
 }
 
-void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                                   const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+                                   CronDrivenSchedulingAgent& cronScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
@@ -202,8 +202,8 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg
   }
 }
 
-void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                                  const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const Processor*)>& filter) {
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+                                  CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (onScheduleTimer_) {
@@ -221,13 +221,13 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
       logger_->log_debug("Stopping %s", processor->getName());
       switch (processor->getSchedulingStrategy()) {
         case TIMER_DRIVEN:
-          timeScheduler->unschedule(processor.get());
+          timeScheduler.unschedule(processor.get());
           break;
         case EVENT_DRIVEN:
-          eventScheduler->unschedule(processor.get());
+          eventScheduler.unschedule(processor.get());
           break;
         case CRON_DRIVEN:
-          cronScheduler->unschedule(processor.get());
+          cronScheduler.unschedule(processor.get());
           break;
       }
     }
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index c5686de77..b496387d9 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -22,9 +22,9 @@
 
 namespace org::apache::nifi::minifi::state {
 
-ProcessorController::ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler)
+ProcessorController::ProcessorController(core::Processor& processor, SchedulingAgent& scheduler)
     : processor_(&processor),
-      scheduler_(std::move(scheduler)) {
+      scheduler_(&scheduler) {
 }
 
 ProcessorController::~ProcessorController() = default;
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 8612f0dbf..baadb59ec 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -21,7 +21,21 @@
 namespace org::apache::nifi::minifi::utils {
 
 template<typename T>
-void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+ThreadPool<T>::ThreadPool(int max_worker_threads, bool daemon_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name)
+    : daemon_threads_(daemon_threads),
+      thread_reduction_count_(0),
+      max_worker_threads_(max_worker_threads),
+      adjust_threads_(false),
+      running_(false),
+      controller_service_provider_(controller_service_provider),
+      name_(std::move(name)),
+      logger_(core::logging::LoggerFactory<ThreadPool<T>>::getLogger()) {
+  current_workers_ = 0;
+  thread_manager_ = nullptr;
+}
+
+template<typename T>
+void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) {
   thread->is_running_ = true;
   while (running_.load()) {
     if (UNLIKELY(thread_reduction_count_ > 0)) {
@@ -181,16 +195,29 @@ void ThreadPool<T>::manageWorkers() {
 }
 
 template<typename T>
-void ThreadPool<T>::start() {
-  if (nullptr != controller_service_provider_) {
-    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
-    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
-  } else {
-    thread_manager_ = nullptr;
+std::shared_ptr<controllers::ThreadManagementService> ThreadPool<T>::createThreadManager() const {
+  if (!controller_service_provider_) {
+    return nullptr;
+  }
+  auto service = controller_service_provider_->getControllerService("ThreadPoolManager");
+  if (!service) {
+    logger_->log_info("Could not find a ThreadPoolManager service");
+    return nullptr;
   }
+  auto thread_manager_service = std::dynamic_pointer_cast<controllers::ThreadManagementService>(service);
+  if (!thread_manager_service) {
+    logger_->log_error("Found ThreadPoolManager, but it is not a ThreadManagementService");
+    return nullptr;
+  }
+  return thread_manager_service;
+}
 
+template<typename T>
+void ThreadPool<T>::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
+    thread_manager_ = createThreadManager();
+
     running_ = true;
     worker_queue_.start();
     manager_thread_ = std::thread(&ThreadPool::manageWorkers, this);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index b5c48dc1d..bc34d239e 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -197,7 +197,7 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
   stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
   controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
-  controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   /* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */
   if (state_dir == nullptr) {
     state_dir_ = std::make_unique<TempDirectory>();
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp
index b80c7563c..36a74046e 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -56,7 +56,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
   auto test_plan = testController.createPlan();
   auto controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
   auto configuration = std::make_shared<minifi::Configure>();
-  auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
+  auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration);
   utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool;
   auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
   count_proc->incrementActiveTasks();


[nifi-minifi-cpp] 02/02: MINIFICPP-2026 Make isRunning member functions const

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8d18ef1ffbb48752909fe5c513c05a1500fbae64
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Fri Jan 6 11:53:50 2023 +0100

    MINIFICPP-2026 Make isRunning member functions const
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1486
---
 extensions/aws/controllerservices/AWSCredentialsService.h      |  2 +-
 .../azure/controllerservices/AzureStorageCredentialsService.h  |  2 +-
 extensions/coap/controllerservice/CoapConnector.h              |  2 +-
 extensions/coap/server/CoapServer.h                            |  6 +++---
 .../elasticsearch/ElasticsearchCredentialsControllerService.h  |  2 +-
 .../gcp/controllerservices/GCPCredentialsControllerService.h   |  2 +-
 extensions/http-curl/client/HTTPClient.h                       |  2 +-
 extensions/jni/ExecuteJavaControllerService.h                  |  2 +-
 extensions/jni/jvm/JavaControllerService.h                     |  2 +-
 extensions/libarchive/FocusArchiveEntry.h                      |  1 -
 extensions/rocksdb-repos/DatabaseContentRepository.h           |  2 +-
 extensions/sql/services/DatabaseService.h                      |  2 +-
 libminifi/include/Connection.h                                 |  2 +-
 libminifi/include/FlowController.h                             |  2 +-
 libminifi/include/c2/C2Protocol.h                              |  6 +++---
 libminifi/include/c2/HeartbeatReporter.h                       |  2 +-
 libminifi/include/c2/triggers/FileUpdateTrigger.h              |  2 +-
 libminifi/include/controllers/AttributeProviderService.h       |  2 +-
 libminifi/include/controllers/LinuxPowerManagementService.h    |  2 +-
 libminifi/include/controllers/NetworkPrioritizerService.h      |  2 +-
 libminifi/include/controllers/SSLContextService.h              |  2 +-
 libminifi/include/controllers/ThreadManagementService.h        | 10 +++++-----
 libminifi/include/controllers/UpdatePolicyControllerService.h  |  2 +-
 libminifi/include/controllers/keyvalue/KeyValueStateStorage.h  |  2 +-
 libminifi/include/core/Connectable.h                           |  2 +-
 libminifi/include/core/Processor.h                             |  2 +-
 libminifi/include/core/ProcessorNode.h                         |  2 +-
 libminifi/include/core/SerializableComponent.h                 |  2 +-
 libminifi/include/core/ThreadedRepository.h                    |  2 +-
 libminifi/include/core/controller/ControllerService.h          |  4 ++--
 libminifi/include/core/state/ProcessorController.h             |  2 +-
 libminifi/include/core/state/UpdateController.h                |  2 +-
 libminifi/include/core/state/nodes/MetricsBase.h               |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h                |  2 +-
 libminifi/src/controllers/LinuxPowerManagementService.cpp      |  2 +-
 libminifi/src/controllers/NetworkPrioritizerService.cpp        |  2 +-
 libminifi/src/controllers/UpdatePolicyControllerService.cpp    |  2 +-
 libminifi/src/core/Processor.cpp                               |  2 +-
 libminifi/src/core/ProcessorNode.cpp                           |  2 +-
 libminifi/src/core/state/ProcessorController.cpp               |  2 +-
 libminifi/test/unit/ComponentManifestTests.cpp                 |  2 +-
 libminifi/test/unit/ControllerTests.cpp                        |  4 ++--
 libminifi/test/unit/MockClasses.h                              |  2 +-
 libminifi/test/unit/ProvenanceTestHelper.h                     |  2 +-
 44 files changed, 53 insertions(+), 54 deletions(-)

diff --git a/extensions/aws/controllerservices/AWSCredentialsService.h b/extensions/aws/controllerservices/AWSCredentialsService.h
index dd9846e31..63f747ac9 100644
--- a/extensions/aws/controllerservices/AWSCredentialsService.h
+++ b/extensions/aws/controllerservices/AWSCredentialsService.h
@@ -71,7 +71,7 @@ class AWSCredentialsService : public core::controller::ControllerService {
     return false;
   };
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/azure/controllerservices/AzureStorageCredentialsService.h b/extensions/azure/controllerservices/AzureStorageCredentialsService.h
index 2bc95526b..6de49dc1a 100644
--- a/extensions/azure/controllerservices/AzureStorageCredentialsService.h
+++ b/extensions/azure/controllerservices/AzureStorageCredentialsService.h
@@ -70,7 +70,7 @@ class AzureStorageCredentialsService : public core::controller::ControllerServic
     return false;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/coap/controllerservice/CoapConnector.h b/extensions/coap/controllerservice/CoapConnector.h
index 84d5bf1a8..1b6e56685 100644
--- a/extensions/coap/controllerservice/CoapConnector.h
+++ b/extensions/coap/controllerservice/CoapConnector.h
@@ -68,7 +68,7 @@ class CoapConnectorService : public core::controller::ControllerService {
 
   void yield() override { }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/coap/server/CoapServer.h b/extensions/coap/server/CoapServer.h
index dc0fe6a99..fc3c067a2 100644
--- a/extensions/coap/server/CoapServer.h
+++ b/extensions/coap/server/CoapServer.h
@@ -183,7 +183,7 @@ class CoapServer : public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() const override {
     return running_.load();
   }
 
@@ -193,14 +193,14 @@ class CoapServer : public core::Connectable {
    */
   void waitForWork(uint64_t timeoutMs);
 
-  virtual void yield() {
+  void yield() override {
   }
 
   /**
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
diff --git a/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h
index ebd80e6d9..e0b9413fe 100644
--- a/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h
+++ b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h
@@ -57,7 +57,7 @@ class ElasticsearchCredentialsControllerService : public core::controller::Contr
     return false;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h
index ae51fa472..b70f04a90 100644
--- a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h
+++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h
@@ -65,7 +65,7 @@ class GCPCredentialsControllerService : public core::controller::ControllerServi
     return false;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index a6c894073..acc650380 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -171,7 +171,7 @@ class HTTPClient : public utils::BaseHTTPClient, public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/extensions/jni/ExecuteJavaControllerService.h b/extensions/jni/ExecuteJavaControllerService.h
index 43e5102b2..9e968ea5f 100644
--- a/extensions/jni/ExecuteJavaControllerService.h
+++ b/extensions/jni/ExecuteJavaControllerService.h
@@ -66,7 +66,7 @@ class ExecuteJavaControllerService : public ConfigurationContext, public std::en
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/jni/jvm/JavaControllerService.h b/extensions/jni/jvm/JavaControllerService.h
index 854240094..497eebada 100644
--- a/extensions/jni/jvm/JavaControllerService.h
+++ b/extensions/jni/jvm/JavaControllerService.h
@@ -71,7 +71,7 @@ class JavaControllerService : public core::controller::ControllerService, public
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 874b3bdc0..f4e2c033b 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -64,7 +64,6 @@ class FocusArchiveEntry : public core::Processor {
    public:
     explicit ReadCallback(core::Processor*, utils::file::FileManager *file_man, ArchiveMetadata *archiveMetadata);
     int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const;
-    bool isRunning() {return proc_->isRunning();}
 
    private:
     utils::file::FileManager *file_man_;
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index fcf943b11..f00bc7905 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -83,7 +83,7 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/extensions/sql/services/DatabaseService.h b/extensions/sql/services/DatabaseService.h
index f82acb092..45e2edd91 100644
--- a/extensions/sql/services/DatabaseService.h
+++ b/extensions/sql/services/DatabaseService.h
@@ -58,7 +58,7 @@ class DatabaseService : public core::controller::ControllerService {
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 40c5611a2..81e1a7a25 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -174,7 +174,7 @@ class Connection : public core::Connectable {
     return queue_.isWorkAvailable();
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index b398191d1..84ac20af8 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -92,7 +92,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
 
   virtual void load(std::unique_ptr<core::ProcessGroup> root = nullptr, bool reload = false);
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return running_.load() || updating_.load();
   }
 
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
index 5db0dcdbd..ce248a4d7 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -75,7 +75,7 @@ class C2Protocol : public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() const override {
     return running_.load();
   }
 
@@ -85,14 +85,14 @@ class C2Protocol : public core::Connectable {
    */
   void waitForWork(uint64_t timeoutMs);
 
-  virtual void yield() {
+  void yield() override {
   }
 
   /**
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
diff --git a/libminifi/include/c2/HeartbeatReporter.h b/libminifi/include/c2/HeartbeatReporter.h
index 9ae00859c..5b4435ddd 100644
--- a/libminifi/include/c2/HeartbeatReporter.h
+++ b/libminifi/include/c2/HeartbeatReporter.h
@@ -67,7 +67,7 @@ class HeartbeatReporter : public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h
index a4b8a4581..745841055 100644
--- a/libminifi/include/c2/triggers/FileUpdateTrigger.h
+++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h
@@ -83,7 +83,7 @@ class FileUpdateTrigger : public C2Trigger {
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/include/controllers/AttributeProviderService.h b/libminifi/include/controllers/AttributeProviderService.h
index 3aee6a525..b3a59be87 100644
--- a/libminifi/include/controllers/AttributeProviderService.h
+++ b/libminifi/include/controllers/AttributeProviderService.h
@@ -30,7 +30,7 @@ class AttributeProviderService : public core::controller::ControllerService {
   using ControllerService::ControllerService;
 
   void yield() override {}
-  bool isRunning() override { return getState() == core::controller::ControllerServiceState::ENABLED; }
+  bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }
   bool isWorkAvailable() override { return false; }
 
   using AttributeMap = std::unordered_map<std::string, std::string>;
diff --git a/libminifi/include/controllers/LinuxPowerManagementService.h b/libminifi/include/controllers/LinuxPowerManagementService.h
index 01bff6f47..4117e19d7 100644
--- a/libminifi/include/controllers/LinuxPowerManagementService.h
+++ b/libminifi/include/controllers/LinuxPowerManagementService.h
@@ -109,7 +109,7 @@ class LinuxPowerManagerService : public ThreadManagementService {
 
   void yield() override;
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   bool isWorkAvailable() override;
 
diff --git a/libminifi/include/controllers/NetworkPrioritizerService.h b/libminifi/include/controllers/NetworkPrioritizerService.h
index 68519b836..87924a716 100644
--- a/libminifi/include/controllers/NetworkPrioritizerService.h
+++ b/libminifi/include/controllers/NetworkPrioritizerService.h
@@ -84,7 +84,7 @@ class NetworkPrioritizerService : public core::controller::ControllerService, pu
 
   void yield() override;
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   bool isWorkAvailable() override;
 
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index f61ecef2b..a9c7a8c55 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -152,7 +152,7 @@ class SSLContextService : public core::controller::ControllerService {
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/libminifi/include/controllers/ThreadManagementService.h b/libminifi/include/controllers/ThreadManagementService.h
index c60bdf3ce..d41c3e830 100644
--- a/libminifi/include/controllers/ThreadManagementService.h
+++ b/libminifi/include/controllers/ThreadManagementService.h
@@ -84,22 +84,22 @@ class ThreadManagementService : public core::controller::ControllerService {
    */
   virtual bool canIncrease() = 0;
 
-  virtual void initialize() {
+  void initialize() override {
     ControllerService::initialize();
   }
 
-  void yield() {
+  void yield() override {
   }
 
-  bool isRunning() {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
-  bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return false;
   }
 
-  virtual void onEnable() {
+  void onEnable() override {
   }
 
  protected:
diff --git a/libminifi/include/controllers/UpdatePolicyControllerService.h b/libminifi/include/controllers/UpdatePolicyControllerService.h
index 9c75e4d7e..e1b67f39a 100644
--- a/libminifi/include/controllers/UpdatePolicyControllerService.h
+++ b/libminifi/include/controllers/UpdatePolicyControllerService.h
@@ -70,7 +70,7 @@ class UpdatePolicyControllerService : public core::controller::ControllerService
 
   void yield() override;
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   bool isWorkAvailable() override;
 
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
index c0f2e5109..9fa3fca20 100644
--- a/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
+++ b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
@@ -45,7 +45,7 @@ class KeyValueStateStorage : public core::StateStorage, public core::controller:
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return getState() == core::controller::ControllerServiceState::ENABLED;
   }
 
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 4b1ac2cd7..f4a3e305a 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -113,7 +113,7 @@ class Connectable : public CoreComponent {
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() = 0;
+  virtual bool isRunning() const = 0;
 
   /**
    * Block until work is available on any input connection, or the given duration elapses
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 52a5f12da..b4a72596b 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -82,7 +82,7 @@ class Processor : public Connectable, public ConfigurableComponent, public state
   Processor(const Processor& parent) = delete;
   Processor& operator=(const Processor& parent) = delete;
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   ~Processor() override;
 
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 96014ed9d..b39bef83d 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -261,7 +261,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
     return false;
   }
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   bool isWorkAvailable() override;
 
diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h
index d5b0b037b..4d3ee9bcc 100644
--- a/libminifi/include/core/SerializableComponent.h
+++ b/libminifi/include/core/SerializableComponent.h
@@ -81,7 +81,7 @@ class SerializableComponent : public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/include/core/ThreadedRepository.h b/libminifi/include/core/ThreadedRepository.h
index c991e2d57..21a4515c5 100644
--- a/libminifi/include/core/ThreadedRepository.h
+++ b/libminifi/include/core/ThreadedRepository.h
@@ -75,7 +75,7 @@ class ThreadedRepository : public core::Repository, public core::TraceableResour
     return true;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return running_state_.load() == RunningState::Running;
   }
 
diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h
index 5aae23f8c..60823608b 100644
--- a/libminifi/include/core/controller/ControllerService.h
+++ b/libminifi/include/core/controller/ControllerService.h
@@ -97,7 +97,7 @@ class ControllerService : public ConfigurableComponent, public Connectable {
     configuration_ = configuration;
   }
 
-  ControllerServiceState getState() {
+  ControllerServiceState getState() const {
     return current_state_.load();
   }
 
@@ -127,7 +127,7 @@ class ControllerService : public ConfigurableComponent, public Connectable {
  protected:
   std::vector<std::shared_ptr<controller::ControllerService> > linked_services_;
   std::shared_ptr<Configure> configuration_;
-  std::atomic<ControllerServiceState> current_state_;
+  mutable std::atomic<ControllerServiceState> current_state_;
   bool canEdit() override {
     return true;
   }
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index e3546da2a..d1f910331 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -56,7 +56,7 @@ class ProcessorController : public StateController {
    */
   int16_t stop() override;
 
-  bool isRunning() override;
+  bool isRunning() const override;
 
   int16_t pause() override;
 
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index f87cc7a56..4ee42bc3e 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -129,7 +129,7 @@ class StateController : public Pausable {
    */
   virtual int16_t stop() = 0;
 
-  virtual bool isRunning() = 0;
+  virtual bool isRunning() const = 0;
 };
 
 /**
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index ff2e20d10..062aa57b7 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -71,7 +71,7 @@ class ResponseNode : public core::Connectable, public PublishedMetricProvider {
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 8d57558f1..ace583c18 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -178,7 +178,7 @@ class SiteToSiteClient : public core::Connectable {
   /**
    * Determines if we are connected and operating
    */
-  bool isRunning() override {
+  bool isRunning() const override {
     return running_;
   }
 
diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp
index 64197b14a..881aa82a1 100644
--- a/libminifi/src/controllers/LinuxPowerManagementService.cpp
+++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp
@@ -167,7 +167,7 @@ void LinuxPowerManagerService::initialize() {
 void LinuxPowerManagerService::yield() {
 }
 
-bool LinuxPowerManagerService::isRunning() {
+bool LinuxPowerManagerService::isRunning() const {
   return getState() == core::controller::ControllerServiceState::ENABLED;
 }
 
diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp b/libminifi/src/controllers/NetworkPrioritizerService.cpp
index 8770b4fa8..564193032 100644
--- a/libminifi/src/controllers/NetworkPrioritizerService.cpp
+++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp
@@ -166,7 +166,7 @@ void NetworkPrioritizerService::reduce_tokens(uint32_t size) {
   }
 }
 
-bool NetworkPrioritizerService::isRunning() {
+bool NetworkPrioritizerService::isRunning() const {
   return getState() == core::controller::ControllerServiceState::ENABLED;
 }
 
diff --git a/libminifi/src/controllers/UpdatePolicyControllerService.cpp b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
index 7a36adde6..3a6a4b686 100644
--- a/libminifi/src/controllers/UpdatePolicyControllerService.cpp
+++ b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
@@ -58,7 +58,7 @@ void UpdatePolicyControllerService::initialize() {
 void UpdatePolicyControllerService::yield() {
 }
 
-bool UpdatePolicyControllerService::isRunning() {
+bool UpdatePolicyControllerService::isRunning() const {
   return getState() == core::controller::ControllerServiceState::ENABLED;
 }
 
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ed40dc3b8..bd4705675 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -83,7 +83,7 @@ Processor::~Processor() {
   logger_->log_debug("Destroying processor %s with uuid %s", name_, getUUIDStr());
 }
 
-bool Processor::isRunning() {
+bool Processor::isRunning() const {
   return (state_ == RUNNING && active_tasks_ > 0);
 }
 
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index ee4162bf8..026ad86cb 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -32,7 +32,7 @@ bool ProcessorNode::isWorkAvailable() {
   return processor_->isWorkAvailable();
 }
 
-bool ProcessorNode::isRunning() {
+bool ProcessorNode::isRunning() const {
   return processor_->isRunning();
 }
 
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index b496387d9..6b4d135b1 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -44,7 +44,7 @@ int16_t ProcessorController::stop() {
   return 0;
 }
 
-bool ProcessorController::isRunning() {
+bool ProcessorController::isRunning() const {
   return processor_->isRunning();
 }
 
diff --git a/libminifi/test/unit/ComponentManifestTests.cpp b/libminifi/test/unit/ComponentManifestTests.cpp
index 093aad2a9..9dacb8851 100644
--- a/libminifi/test/unit/ComponentManifestTests.cpp
+++ b/libminifi/test/unit/ComponentManifestTests.cpp
@@ -48,7 +48,7 @@ class ExampleService : public core::controller::ControllerService {
 
   bool canEdit() override { return false; }
   void yield() override {}
-  bool isRunning() override { return false; }
+  bool isRunning() const override { return false; }
   bool isWorkAvailable() override { return false; }
 };
 
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index 8b7a2bec1..796d1eae2 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -58,7 +58,7 @@ class TestStateController : public minifi::state::StateController {
     return 0;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return is_running;
   }
 
@@ -113,7 +113,7 @@ class TestUpdateSink : public minifi::state::StateMonitor {
     return 0;
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return is_running;
   }
 
diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h
index 042833314..dabd5da41 100644
--- a/libminifi/test/unit/MockClasses.h
+++ b/libminifi/test/unit/MockClasses.h
@@ -64,7 +64,7 @@ class MockControllerService : public minifi::core::controller::ControllerService
   void yield() override {
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }
 
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 194db5c96..d51ad27e7 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -266,7 +266,7 @@ class TestFlowController : public org::apache::nifi::minifi::FlowController {
     stop();
   }
 
-  bool isRunning() override {
+  bool isRunning() const override {
     return true;
   }