You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 17:09:08 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1991 - Remove unused ControllerServiceProvider methods

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6c57decc7ce380af863dd621eb457f958b6698b0
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Tue Nov 22 14:04:02 2022 +0100

    MINIFICPP-1991 - Remove unused ControllerServiceProvider methods
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1458
---
 .../tests/ControllerServiceIntegrationTests.cpp    |  43 +++----
 libminifi/include/FlowController.h                 |  37 ++----
 libminifi/include/SchedulingAgent.h                |  17 +--
 libminifi/include/core/ProcessContext.h            |   2 +-
 libminifi/include/core/ProcessGroup.h              |  14 +--
 .../core/controller/ControllerServiceProvider.h    |  88 +-------------
 .../ForwardingControllerServiceProvider.h          |  62 +---------
 .../controller/StandardControllerServiceProvider.h | 131 +++------------------
 libminifi/include/core/state/ProcessorController.h |   4 +-
 libminifi/include/utils/ThreadPool.h               |  23 ++--
 libminifi/src/FlowController.cpp                   |  29 ++---
 libminifi/src/SchedulingAgent.cpp                  |  41 -------
 libminifi/src/core/FlowConfiguration.cpp           |   4 +-
 libminifi/src/core/ProcessGroup.cpp                |  28 ++---
 libminifi/src/core/state/ProcessorController.cpp   |   4 +-
 libminifi/src/utils/ThreadPool.cpp                 |  41 +++++--
 libminifi/test/TestBase.cpp                        |   2 +-
 libminifi/test/unit/SchedulingAgentTests.cpp       |   2 +-
 18 files changed, 142 insertions(+), 430 deletions(-)

diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index ccd910b1b..9e92c8270 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -83,7 +83,7 @@ int main(int argc, char **argv) {
 
   auto pg = yaml_config.getRoot();
 
-  auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg.get(), std::make_shared<minifi::Configure>());
+  auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, std::make_shared<minifi::Configure>());
   std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
   assert(mockNode != nullptr);
   mockNode->enable();
@@ -108,27 +108,28 @@ int main(int argc, char **argv) {
   assert(!ssl_client->getCACertificate().empty());
   // now let's disable one of the controller services.
   std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
-  const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
   assert(cs_id != nullptr);
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->enableControllerService(cs_id);
-    disabled = false;
-  }
-  std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->disableReferencingServices(mock_cont);
-    disabled = true;
-  }
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
-    controller->enableReferencingServices(mock_cont);
-    disabled = false;
-  }
-  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+  // TODO(adebreceni): MINIFICPP-1992
+//  const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->enableControllerService(cs_id);
+//    disabled = false;
+//  }
+//  std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->disableReferencingServices(mock_cont);
+//    disabled = true;
+//  }
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+//  {
+//    std::lock_guard<std::mutex> lock(control_mutex);
+//    controller->enableReferencingServices(mock_cont);
+//    disabled = false;
+//  }
+//  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
 
   controller->waitUnload(60000);
   return 0;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 97710c0d0..b398191d1 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -65,7 +65,6 @@ namespace state {
 class ProcessorController;
 }  // namespace state
 
-// Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""
 
 /**
@@ -87,28 +86,24 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
 
   ~FlowController() override;
 
-  // Get the provenance repository
   virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
     return this->provenance_repo_;
   }
 
-  // Load flow xml from disk, after that, create the root process group and its children, initialize the flows
   virtual void load(std::unique_ptr<core::ProcessGroup> root = nullptr, bool reload = false);
 
-  // Whether the Flow Controller is start running
   bool isRunning() override {
     return running_.load() || updating_.load();
   }
 
-  // Whether the Flow Controller has already been initialized (loaded flow XML)
   virtual bool isInitialized() {
     return initialized_.load();
   }
-  // Start to run the Flow Controller which internally start the root process group and all its children
+  // Start the Flow Controller which internally starts the root process group and all its children
   int16_t start() override;
   int16_t pause() override;
   int16_t resume() override;
-  // Unload the current flow YAML, clean the root process group and all its children
+  // Unload the current flow, clean the root process group and all its children
   int16_t stop() override;
   int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override;
   int16_t drainRepositories() override {
@@ -123,31 +118,27 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; }
   // Asynchronous function trigger unloading and wait for a period of time
   virtual void waitUnload(uint64_t timeToWaitMs);
-  // Unload the current flow xml, clean the root process group and all its children
+  // Unload the current flow, clean the root process group and all its children
   virtual void unload();
-  // update property value
   void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
     if (root_ != nullptr)
       root_->updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue));
   }
 
-  // set SerialNumber
   void setSerialNumber(std::string number) {
     serial_number_ = std::move(number);
   }
 
-  // get serial number as string
   std::string getSerialNumber() {
     return serial_number_;
   }
 
-  // validate and apply passing yaml configuration payload
+  // validate and apply passing configuration payload
   // first it will validate the payload with the current root node config for flowController
   // like FlowController id/name is the same and new version is greater than the current version
   // after that, it will apply the configuration
   bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt);
 
-  // get name
   std::string getName() const override {
     if (root_ != nullptr)
       return root_->getName();
@@ -166,7 +157,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
     return root_->getUUID();
   }
 
-  // get version
   virtual std::string getVersion() {
     if (root_ != nullptr)
       return std::to_string(root_->getVersion());
@@ -201,38 +191,29 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   void loadMetricsPublisher();
 
  protected:
-  // function to load the flow file repo.
   void loadFlowRepo();
 
   std::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
 
  private:
   template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
-  void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) {
+  void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) {
     if (condition) {
-      scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
+      scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
     }
   }
 
  protected:
-  // flow controller mutex
   std::recursive_mutex mutex_;
 
-  // Whether it is running
   std::atomic<bool> running_;
   std::atomic<bool> updating_;
 
-  // Whether it has already been initialized (load the flow XML already)
   std::atomic<bool> initialized_;
-  // Flow Timer Scheduler
-  std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
-  // Flow Event Scheduler
-  std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
-  // Cron Schedule
-  std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
-  // FlowControl Protocol
+  std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
+  std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_;
+  std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
   std::unique_ptr<FlowControlProtocol> protocol_;
-  // metrics information
   std::chrono::steady_clock::time_point start_time_;
 
  private:
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 3ec3ad892..24992c928 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -17,8 +17,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+
+#pragma once
 
 #include <memory>
 #include <string>
@@ -47,10 +47,7 @@
 #define SCHEDULING_WATCHDOG_CHECK_PERIOD_MS 1000  // msec
 #define SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS 5000  // msec
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 // SchedulingAgent Class
 class SchedulingAgent {
@@ -104,8 +101,6 @@ class SchedulingAgent {
 
   void watchDogFunc();
 
-  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
-  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
   // schedule, overwritten by different DrivenSchedulingAgent
   virtual void schedule(core::Processor* processor) = 0;
   // unschedule, overwritten by different DrivenSchedulingAgent
@@ -161,8 +156,4 @@ class SchedulingAgent {
   std::chrono::milliseconds alert_time_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index f008e6669..0e0a9b5a1 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -194,7 +194,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
    * identifier
    */
   std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) const override {
-    return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUID());
+    return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier);
   }
 
   /**
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 96dd68438..af43f8e02 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -146,13 +146,13 @@ class ProcessGroup : public CoreComponent {
     return config_version_;
   }
 
-  void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-                       const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                       const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
+  void startProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+                       EventDrivenSchedulingAgent& eventScheduler,
+                       CronDrivenSchedulingAgent& cronScheduler);
 
-  void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-                      const std::shared_ptr<EventDrivenSchedulingAgent>& eventScheduler,
-                      const std::shared_ptr<CronDrivenSchedulingAgent>& cronScheduler,
+  void stopProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+                      EventDrivenSchedulingAgent& eventScheduler,
+                      CronDrivenSchedulingAgent& cronScheduler,
                       const std::function<bool(const Processor*)>& filter = nullptr);
 
   bool isRemoteProcessGroup();
@@ -231,7 +231,7 @@ class ProcessGroup : public CoreComponent {
   void verify() const;
 
  protected:
-  void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
+  void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);
 
   // version
   int config_version_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index bb1f826a8..55f4d0ca2 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
 
 #include <memory>
 #include <string>
@@ -78,32 +78,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
     return controller_map_->getControllerServiceNode(id);
   }
 
-  /**
-   * Removes a controller service.
-   * @param serviceNode controller service node.
-   */
-  virtual void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    controller_map_->removeControllerService(serviceNode);
-  }
-
-  /**
-   * Enables the provided controller service
-   * @param serviceNode controller service node.
-   */
-  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Enables the provided controller service nodes
-   * @param serviceNode controller service node.
-   */
-  virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0;
-
-  /**
-   * Disables the provided controller service node
-   * @param serviceNode controller service node.
-   */
-  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
   /**
    * Removes all controller services.
    */
@@ -116,62 +90,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
     return controller_map_->getAllControllerServices();
   }
 
-  /**
-   * Verifies that referencing components can be stopped for the controller service
-   */
-  virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   *  Unschedules referencing components.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Verifies referencing components for <code>serviceNode</code> can be disabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Disables referencing components for <code>serviceNode</code> can be disabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
-    return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>();
-  }
-
-  /**
-   * Verifies referencing components for <code>serviceNode</code> can be enabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      ref->canEnable();
-    }
-  }
-
-  /**
-   * Enables referencing components for <code>serviceNode</code> can be Enabled.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Schedules the service node and referencing components.
-   * @param serviceNode shared pointer to a controller service node.
-   */
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
-  /**
-   * Returns a controller service for the service identifier and componentID
-   * @param service Identifier service identifier.
-   */
-  virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string& serviceIdentifier, const utils::Identifier& /*componentId*/) const {
-    std::shared_ptr<ControllerService> node = getControllerService(serviceIdentifier);
-    return node;
-  }
-
   /**
    * Gets the controller service for the provided identifier
    * @param identifier service identifier.
@@ -272,5 +190,3 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
 };
 
 }  // namespace org::apache::nifi::minifi::core::controller
-
-#endif  // LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
index 960307be5..d70d6d324 100644
--- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
@@ -25,12 +25,7 @@
 #include "ControllerServiceProvider.h"
 #include "ControllerServiceNode.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
 
 class ForwardingControllerServiceProvider : public ControllerServiceProvider {
  public:
@@ -44,22 +39,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
     return controller_service_provider_impl_->getControllerServiceNode(id);
   }
 
-  void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->removeControllerService(serviceNode);
-  }
-
-  std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->enableControllerService(serviceNode);
-  }
-
-  void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override {
-    return controller_service_provider_impl_->enableControllerServices(serviceNodes);
-  }
-
-  std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->disableControllerService(serviceNode);
-  }
-
   void clearControllerServices() override {
     return controller_service_provider_impl_->clearControllerServices();
   }
@@ -72,38 +51,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
     return controller_service_provider_impl_->getAllControllerServices();
   }
 
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode);
-  }
-
-  void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode);
-  }
-
-  void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode);
-  }
-
-  void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->disableReferencingServices(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->enableReferencingServices(serviceNode);
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
-    return controller_service_provider_impl_->scheduleReferencingComponents(serviceNode);
-  }
-
-  std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) const override {
-    return controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier, componentId);
-  }
-
   bool isControllerServiceEnabled(const std::string &identifier) override {
     return controller_service_provider_impl_->isControllerServiceEnabled(identifier);
   }
@@ -128,9 +75,4 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
   std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_;
 };
 
-}  // namespace controller
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 92b12890c..8730133fd 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
 
 #include <string>
 #include <utility>
@@ -32,32 +32,16 @@
 #include "StandardControllerServiceNode.h"
 #include "ControllerServiceProvider.h"
 #include "core/logging/LoggerFactory.h"
+#include "SchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
 
 class StandardControllerServiceProvider : public ControllerServiceProvider, public std::enable_shared_from_this<StandardControllerServiceProvider> {
  public:
-  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration,
-                                             std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader())
-      : ControllerServiceProvider(services),
-        agent_(agent),
-        extension_loader_(loader),
-        root_group_(root_group),
-        configuration_(configuration),
-        logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
-  }
-
-  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration, ClassLoader &loader =
+  explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<Configure> configuration, ClassLoader &loader =
                                                  ClassLoader::getDefaultClassLoader())
       : ControllerServiceProvider(services),
-        agent_(nullptr),
         extension_loader_(loader),
-        root_group_(root_group),
         configuration_(configuration),
         logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
   }
@@ -68,14 +52,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
   StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete;
   StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete;
 
-  void setRootGroup(ProcessGroup* rg) {
-    root_group_ = rg;
-  }
-
-  void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) {
-    agent_ = agent;
-  }
-
   std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool /*firstTimeAdded*/) {
     std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
 
@@ -97,22 +73,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     return new_service_node;
   }
 
-  std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    if (serviceNode->canEnable()) {
-      return agent_->enableControllerService(serviceNode);
-    } else {
-      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
-      return no_run;
-    }
-  }
-
   virtual void enableAllControllerServices() {
     logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
-      if (service->canEnable()) {
-        logger_->log_info("Enabling %s", service->getName());
-        agent_->enableControllerService(service);
-      } else {
+      logger_->log_info("Enabling %s", service->getName());
+      if (!service->canEnable()) {
+        logger_->log_warn("Service %s cannot be enabled", service->getName());
+        continue;
+      }
+      if (!service->enable()) {
         logger_->log_warn("Could not enable %s", service->getName());
       }
     }
@@ -121,98 +90,32 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
   virtual void disableAllControllerServices() {
     logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
+      logger_->log_info("Disabling %s", service->getName());
+      if (!service->enabled()) {
+        logger_->log_warn("Service %s is not enabled", service->getName());
+        continue;
+      }
       if (!service->disable()) {
         logger_->log_warn("Could not disable %s", service->getName());
       }
     }
   }
 
-  void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
-    for (auto node : serviceNodes) {
-      enableControllerService(node);
-    }
-  }
-
-  std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
-    if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
-      return agent_->disableControllerService(serviceNode);
-    } else {
-      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
-      return no_run;
-    }
-  }
-
   void clearControllerServices() {
     controller_map_->clear();
   }
 
-  void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->disableControllerService(ref);
-    }
-    return references;
-  }
-
-  void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      if (!ref->canEnable()) {
-        logger_->log_info("Cannot disable %s", ref->getName());
-      }
-    }
-  }
-
-  virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->disableControllerService(ref);
-    }
-
-    return references;
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->enableControllerService(ref);
-    }
-    return references;
-  }
-
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-    std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
-    for (auto ref : references) {
-      agent_->enableControllerService(ref);
-    }
-    return references;
-  }
-
  protected:
   bool canEdit() {
     return false;
   }
 
-  std::shared_ptr<minifi::SchedulingAgent> agent_;
-
   ClassLoader &extension_loader_;
 
-  ProcessGroup* root_group_ = nullptr;
-
   std::shared_ptr<Configure> configuration_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace controller
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+}  // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 109d5c490..e3546da2a 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -32,7 +32,7 @@ namespace org::apache::nifi::minifi::state {
  */
 class ProcessorController : public StateController {
  public:
-  ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler);
+  ProcessorController(core::Processor& processor, SchedulingAgent& scheduler);
 
   ~ProcessorController() override;
 
@@ -64,7 +64,7 @@ class ProcessorController : public StateController {
 
  protected:
   gsl::not_null<core::Processor*> processor_;
-  std::shared_ptr<SchedulingAgent> scheduler_;
+  gsl::not_null<SchedulingAgent*> scheduler_;
 };
 
 }  // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 37aa27fdd..68c2663ec 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -40,6 +40,7 @@
 #include "controllers/ThreadManagementService.h"
 #include "core/controller/ControllerService.h"
 #include "core/controller/ControllerServiceProvider.h"
+
 namespace org::apache::nifi::minifi::utils {
 
 using TaskId = std::string;
@@ -162,18 +163,8 @@ class WorkerThread {
 template<typename T>
 class ThreadPool {
  public:
-  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, core::controller::ControllerServiceProvider* controller_service_provider = nullptr,
-             std::string name = "NamelessPool")
-      : daemon_threads_(daemon_threads),
-        thread_reduction_count_(0),
-        max_worker_threads_(max_worker_threads),
-        adjust_threads_(false),
-        running_(false),
-        controller_service_provider_(controller_service_provider),
-        name_(std::move(name)) {
-    current_workers_ = 0;
-    thread_manager_ = nullptr;
-  }
+  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false,
+             core::controller::ControllerServiceProvider* controller_service_provider = nullptr, std::string name = "NamelessPool");
 
   ThreadPool(const ThreadPool<T> &other) = delete;
   ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
@@ -277,6 +268,9 @@ class ThreadPool {
       start();
   }
 
+ private:
+  std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const;
+
  protected:
   std::thread createThread(std::function<void()> &&functor) {
     return std::thread([ functor ]() mutable {
@@ -296,6 +290,7 @@ class ThreadPool {
       std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
   }
+
 // determines if threads are detached
   bool daemon_threads_;
   std::atomic<int> thread_reduction_count_;
@@ -337,6 +332,8 @@ class ThreadPool {
   // variable to signal task running completion
   std::condition_variable task_run_complete_;
 
+  std::shared_ptr<core::logging::Logger> logger_;
+
   /**
    * Call for the manager to start worker threads
    */
@@ -345,7 +342,7 @@ class ThreadPool {
   /**
    * Runs worker tasks
    */
-  void run_tasks(std::shared_ptr<WorkerThread> thread);
+  void run_tasks(const std::shared_ptr<WorkerThread>& thread);
 
   void manage_delayed_queue();
 };
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 87ea88853..9d0d62e15 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -182,7 +182,7 @@ int16_t FlowController::stop() {
     logger_->log_info("Stop Flow Controller");
     if (this->root_) {
       // stop source processors first
-      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const core::Processor* proc) -> bool {
+      this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_, [] (const core::Processor* proc) -> bool {
         return !proc->hasIncomingConnections();
       });
       // we enable C2 to progressively increase the timeout
@@ -194,7 +194,7 @@ int16_t FlowController::stop() {
         std::this_thread::sleep_for(shutdown_check_interval_);
       }
       // shutdown all other processors as well
-      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+      this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
     }
     // stop after we've attempted to stop the processors.
     timer_scheduler_->stop();
@@ -211,7 +211,7 @@ int16_t FlowController::stop() {
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // stop the ControllerServices
-    this->controller_service_provider_impl_->disableAllControllerServices();
+    disableAllControllerServices();
     running_ = false;
   }
   return 0;
@@ -324,10 +324,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
     conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload);
     conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload);
 
-    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_.get());
-    std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent(
-        std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
-
     logger_->log_info("Loaded controller service provider");
 
     /*
@@ -374,16 +370,16 @@ int16_t FlowController::start() {
   } else {
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
-      controller_service_provider_impl_->enableAllControllerServices();
-      this->timer_scheduler_->start();
-      this->event_scheduler_->start();
-      this->cron_scheduler_->start();
+      enableAllControllerServices();
+      timer_scheduler_->start();
+      event_scheduler_->start();
+      cron_scheduler_->start();
 
       if (this->root_ != nullptr) {
         start_time_ = std::chrono::steady_clock::now();
         // watch out, this might immediately start the processors
         // as the thread_pool_ is started in load()
-        this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+        this->root_->startProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
       }
       C2Client::initialize(this, this, this);
       core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
@@ -391,7 +387,6 @@ int16_t FlowController::start() {
       this->protocol_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
-      thread_pool_.start();
       logger_->log_info("Started Flow Controller");
     }
     return 0;
@@ -504,11 +499,11 @@ state::StateController* FlowController::getComponent(const std::string& id_or_na
 }
 
 gsl::not_null<std::unique_ptr<state::ProcessorController>> FlowController::createController(core::Processor& processor) {
-  const auto scheduler = [this, &processor]() -> std::shared_ptr<SchedulingAgent> {
+  const auto scheduler = [this, &processor]() -> SchedulingAgent& {
     switch (processor.getSchedulingStrategy()) {
-      case core::SchedulingStrategy::TIMER_DRIVEN: return timer_scheduler_;
-      case core::SchedulingStrategy::EVENT_DRIVEN: return event_scheduler_;
-      case core::SchedulingStrategy::CRON_DRIVEN: return cron_scheduler_;
+      case core::SchedulingStrategy::TIMER_DRIVEN: return *timer_scheduler_;
+      case core::SchedulingStrategy::EVENT_DRIVEN: return *event_scheduler_;
+      case core::SchedulingStrategy::CRON_DRIVEN: return *cron_scheduler_;
     }
     gsl_Assert(false);
   };
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 5b59faa31..9dfca5c58 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -34,47 +34,6 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 
 namespace org::apache::nifi::minifi {
 
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
-  // reference the enable function from serviceNode
-  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
-      serviceNode->enable();
-      return utils::TaskRescheduleInfo::Done();
-    };
-
-  // only need to run this once.
-  auto monitor = std::make_unique<utils::ComplexMonitor>();
-  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
-  // move the functor into the thread pool. While a future is returned
-  // we aren't terribly concerned with the result.
-  std::future<utils::TaskRescheduleInfo> future;
-  thread_pool_.execute(std::move(functor), future);
-  if (future.valid())
-    future.wait();
-  return future;
-}
-
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
-  // reference the disable function from serviceNode
-  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
-    serviceNode->disable();
-    return utils::TaskRescheduleInfo::Done();
-  };
-
-  // only need to run this once.
-  auto monitor = std::make_unique<utils::ComplexMonitor>();
-  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
-
-  // move the functor into the thread pool. While a future is returned
-  // we aren't terribly concerned with the result.
-  std::future<utils::TaskRescheduleInfo> future;
-  thread_pool_.execute(std::move(functor), future);
-  if (future.valid())
-    future.wait();
-  return future;
-}
-
 bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                 const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   gsl_Expects(processor);
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index c7981b86f..b53d3fe90 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -37,7 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
       filesystem_(std::move(ctx.filesystem)),
       logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
-  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   std::string flowUrl;
   std::string bucket_id = "default";
   std::string flowId;
@@ -97,7 +97,7 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s
   auto old_services = controller_services_;
   auto old_provider = service_provider_;
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
-  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   auto payload = getRootFromPayload(yamlConfigPayload);
   if (!url.empty() && payload != nullptr) {
     std::string payload_flow_id;
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7922717fb..7300d2129 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -122,8 +122,8 @@ void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
   }
 }
 
-void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
-    const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler,
+    EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler) {
   std::unique_lock<std::recursive_mutex> lock(mutex_);
 
   std::set<Processor*> failed_processors;
@@ -133,13 +133,13 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
       logger_->log_debug("Starting %s", processor->getName());
       switch (processor->getSchedulingStrategy()) {
         case TIMER_DRIVEN:
-          timeScheduler->schedule(processor);
+          timeScheduler.schedule(processor);
           break;
         case EVENT_DRIVEN:
-          eventScheduler->schedule(processor);
+          eventScheduler.schedule(processor);
           break;
         case CRON_DRIVEN:
-          cronScheduler->schedule(processor);
+          cronScheduler.schedule(processor);
           break;
       }
     }
@@ -166,8 +166,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
 
   if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) {
     logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load());
-    auto func = [this, eventScheduler, cronScheduler, timeScheduler]() {
-      this->startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
+    auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() {
+      this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler);
     };
     onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_), func);
     onScheduleTimer_->start();
@@ -176,8 +176,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
   }
 }
 
-void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                                   const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+                                   CronDrivenSchedulingAgent& cronScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
@@ -202,8 +202,8 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg
   }
 }
 
-void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
-                                  const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const Processor*)>& filter) {
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+                                  CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (onScheduleTimer_) {
@@ -221,13 +221,13 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
       logger_->log_debug("Stopping %s", processor->getName());
       switch (processor->getSchedulingStrategy()) {
         case TIMER_DRIVEN:
-          timeScheduler->unschedule(processor.get());
+          timeScheduler.unschedule(processor.get());
           break;
         case EVENT_DRIVEN:
-          eventScheduler->unschedule(processor.get());
+          eventScheduler.unschedule(processor.get());
           break;
         case CRON_DRIVEN:
-          cronScheduler->unschedule(processor.get());
+          cronScheduler.unschedule(processor.get());
           break;
       }
     }
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index c5686de77..b496387d9 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -22,9 +22,9 @@
 
 namespace org::apache::nifi::minifi::state {
 
-ProcessorController::ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler)
+ProcessorController::ProcessorController(core::Processor& processor, SchedulingAgent& scheduler)
     : processor_(&processor),
-      scheduler_(std::move(scheduler)) {
+      scheduler_(&scheduler) {
 }
 
 ProcessorController::~ProcessorController() = default;
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 8612f0dbf..baadb59ec 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -21,7 +21,21 @@
 namespace org::apache::nifi::minifi::utils {
 
 template<typename T>
-void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+ThreadPool<T>::ThreadPool(int max_worker_threads, bool daemon_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name)
+    : daemon_threads_(daemon_threads),
+      thread_reduction_count_(0),
+      max_worker_threads_(max_worker_threads),
+      adjust_threads_(false),
+      running_(false),
+      controller_service_provider_(controller_service_provider),
+      name_(std::move(name)),
+      logger_(core::logging::LoggerFactory<ThreadPool<T>>::getLogger()) {
+  current_workers_ = 0;
+  thread_manager_ = nullptr;
+}
+
+template<typename T>
+void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) {
   thread->is_running_ = true;
   while (running_.load()) {
     if (UNLIKELY(thread_reduction_count_ > 0)) {
@@ -181,16 +195,29 @@ void ThreadPool<T>::manageWorkers() {
 }
 
 template<typename T>
-void ThreadPool<T>::start() {
-  if (nullptr != controller_service_provider_) {
-    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
-    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
-  } else {
-    thread_manager_ = nullptr;
+std::shared_ptr<controllers::ThreadManagementService> ThreadPool<T>::createThreadManager() const {
+  if (!controller_service_provider_) {
+    return nullptr;
+  }
+  auto service = controller_service_provider_->getControllerService("ThreadPoolManager");
+  if (!service) {
+    logger_->log_info("Could not find a ThreadPoolManager service");
+    return nullptr;
   }
+  auto thread_manager_service = std::dynamic_pointer_cast<controllers::ThreadManagementService>(service);
+  if (!thread_manager_service) {
+    logger_->log_error("Found ThreadPoolManager, but it is not a ThreadManagementService");
+    return nullptr;
+  }
+  return thread_manager_service;
+}
 
+template<typename T>
+void ThreadPool<T>::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
+    thread_manager_ = createThreadManager();
+
     running_ = true;
     worker_queue_.start();
     manager_thread_ = std::thread(&ThreadPool::manageWorkers, this);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index b5c48dc1d..bc34d239e 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -197,7 +197,7 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
   stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
   controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
-  controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
   /* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */
   if (state_dir == nullptr) {
     state_dir_ = std::make_unique<TempDirectory>();
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp
index b80c7563c..36a74046e 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -56,7 +56,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
   auto test_plan = testController.createPlan();
   auto controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
   auto configuration = std::make_shared<minifi::Configure>();
-  auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
+  auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration);
   utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool;
   auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
   count_proc->incrementActiveTasks();