You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 17:09:08 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1991 - Remove unused ControllerServiceProvider methods
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6c57decc7ce380af863dd621eb457f958b6698b0
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Tue Nov 22 14:04:02 2022 +0100
MINIFICPP-1991 - Remove unused ControllerServiceProvider methods
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1458
---
.../tests/ControllerServiceIntegrationTests.cpp | 43 +++----
libminifi/include/FlowController.h | 37 ++----
libminifi/include/SchedulingAgent.h | 17 +--
libminifi/include/core/ProcessContext.h | 2 +-
libminifi/include/core/ProcessGroup.h | 14 +--
.../core/controller/ControllerServiceProvider.h | 88 +-------------
.../ForwardingControllerServiceProvider.h | 62 +---------
.../controller/StandardControllerServiceProvider.h | 131 +++------------------
libminifi/include/core/state/ProcessorController.h | 4 +-
libminifi/include/utils/ThreadPool.h | 23 ++--
libminifi/src/FlowController.cpp | 29 ++---
libminifi/src/SchedulingAgent.cpp | 41 -------
libminifi/src/core/FlowConfiguration.cpp | 4 +-
libminifi/src/core/ProcessGroup.cpp | 28 ++---
libminifi/src/core/state/ProcessorController.cpp | 4 +-
libminifi/src/utils/ThreadPool.cpp | 41 +++++--
libminifi/test/TestBase.cpp | 2 +-
libminifi/test/unit/SchedulingAgentTests.cpp | 2 +-
18 files changed, 142 insertions(+), 430 deletions(-)
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index ccd910b1b..9e92c8270 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -83,7 +83,7 @@ int main(int argc, char **argv) {
auto pg = yaml_config.getRoot();
- auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg.get(), std::make_shared<minifi::Configure>());
+ auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, std::make_shared<minifi::Configure>());
std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
assert(mockNode != nullptr);
mockNode->enable();
@@ -108,27 +108,28 @@ int main(int argc, char **argv) {
assert(!ssl_client->getCACertificate().empty());
// now let's disable one of the controller services.
std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
- const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
assert(cs_id != nullptr);
- {
- std::lock_guard<std::mutex> lock(control_mutex);
- controller->enableControllerService(cs_id);
- disabled = false;
- }
- std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
- assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
- {
- std::lock_guard<std::mutex> lock(control_mutex);
- controller->disableReferencingServices(mock_cont);
- disabled = true;
- }
- assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
- {
- std::lock_guard<std::mutex> lock(control_mutex);
- controller->enableReferencingServices(mock_cont);
- disabled = false;
- }
- assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+ // TODO(adebreceni): MINIFICPP-1992
+// const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
+// {
+// std::lock_guard<std::mutex> lock(control_mutex);
+// controller->enableControllerService(cs_id);
+// disabled = false;
+// }
+// std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
+// assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
+// {
+// std::lock_guard<std::mutex> lock(control_mutex);
+// controller->disableReferencingServices(mock_cont);
+// disabled = true;
+// }
+// assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+// {
+// std::lock_guard<std::mutex> lock(control_mutex);
+// controller->enableReferencingServices(mock_cont);
+// disabled = false;
+// }
+// assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
controller->waitUnload(60000);
return 0;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 97710c0d0..b398191d1 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -65,7 +65,6 @@ namespace state {
class ProcessorController;
} // namespace state
-// Default NiFi Root Group Name
#define DEFAULT_ROOT_GROUP_NAME ""
/**
@@ -87,28 +86,24 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
~FlowController() override;
- // Get the provenance repository
virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
return this->provenance_repo_;
}
- // Load flow xml from disk, after that, create the root process group and its children, initialize the flows
virtual void load(std::unique_ptr<core::ProcessGroup> root = nullptr, bool reload = false);
- // Whether the Flow Controller is start running
bool isRunning() override {
return running_.load() || updating_.load();
}
- // Whether the Flow Controller has already been initialized (loaded flow XML)
virtual bool isInitialized() {
return initialized_.load();
}
- // Start to run the Flow Controller which internally start the root process group and all its children
+ // Start the Flow Controller which internally starts the root process group and all its children
int16_t start() override;
int16_t pause() override;
int16_t resume() override;
- // Unload the current flow YAML, clean the root process group and all its children
+ // Unload the current flow, clean the root process group and all its children
int16_t stop() override;
int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override;
int16_t drainRepositories() override {
@@ -123,31 +118,27 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; }
// Asynchronous function trigger unloading and wait for a period of time
virtual void waitUnload(uint64_t timeToWaitMs);
- // Unload the current flow xml, clean the root process group and all its children
+ // Unload the current flow, clean the root process group and all its children
virtual void unload();
- // update property value
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
if (root_ != nullptr)
root_->updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue));
}
- // set SerialNumber
void setSerialNumber(std::string number) {
serial_number_ = std::move(number);
}
- // get serial number as string
std::string getSerialNumber() {
return serial_number_;
}
- // validate and apply passing yaml configuration payload
+ // validate and apply passing configuration payload
// first it will validate the payload with the current root node config for flowController
// like FlowController id/name is the same and new version is greater than the current version
// after that, it will apply the configuration
bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt);
- // get name
std::string getName() const override {
if (root_ != nullptr)
return root_->getName();
@@ -166,7 +157,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
return root_->getUUID();
}
- // get version
virtual std::string getVersion() {
if (root_ != nullptr)
return std::to_string(root_->getVersion());
@@ -201,38 +191,29 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
void loadMetricsPublisher();
protected:
- // function to load the flow file repo.
void loadFlowRepo();
std::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
private:
template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
- void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) {
+ void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) {
if (condition) {
- scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
+ scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
}
protected:
- // flow controller mutex
std::recursive_mutex mutex_;
- // Whether it is running
std::atomic<bool> running_;
std::atomic<bool> updating_;
- // Whether it has already been initialized (load the flow XML already)
std::atomic<bool> initialized_;
- // Flow Timer Scheduler
- std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
- // Flow Event Scheduler
- std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
- // Cron Schedule
- std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
- // FlowControl Protocol
+ std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
+ std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_;
+ std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
std::unique_ptr<FlowControlProtocol> protocol_;
- // metrics information
std::chrono::steady_clock::time_point start_time_;
private:
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 3ec3ad892..24992c928 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -17,8 +17,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+
+#pragma once
#include <memory>
#include <string>
@@ -47,10 +47,7 @@
#define SCHEDULING_WATCHDOG_CHECK_PERIOD_MS 1000 // msec
#define SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS 5000 // msec
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
// SchedulingAgent Class
class SchedulingAgent {
@@ -104,8 +101,6 @@ class SchedulingAgent {
void watchDogFunc();
- virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
- virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
// schedule, overwritten by different DrivenSchedulingAgent
virtual void schedule(core::Processor* processor) = 0;
// unschedule, overwritten by different DrivenSchedulingAgent
@@ -161,8 +156,4 @@ class SchedulingAgent {
std::chrono::milliseconds alert_time_;
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index f008e6669..0e0a9b5a1 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -194,7 +194,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
* identifier
*/
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) const override {
- return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUID());
+ return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier);
}
/**
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 96dd68438..af43f8e02 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -146,13 +146,13 @@ class ProcessGroup : public CoreComponent {
return config_version_;
}
- void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
- const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
- const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
+ void startProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+ EventDrivenSchedulingAgent& eventScheduler,
+ CronDrivenSchedulingAgent& cronScheduler);
- void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
- const std::shared_ptr<EventDrivenSchedulingAgent>& eventScheduler,
- const std::shared_ptr<CronDrivenSchedulingAgent>& cronScheduler,
+ void stopProcessing(TimerDrivenSchedulingAgent& timeScheduler,
+ EventDrivenSchedulingAgent& eventScheduler,
+ CronDrivenSchedulingAgent& cronScheduler,
const std::function<bool(const Processor*)>& filter = nullptr);
bool isRemoteProcessGroup();
@@ -231,7 +231,7 @@ class ProcessGroup : public CoreComponent {
void verify() const;
protected:
- void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
+ void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);
// version
int config_version_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index bb1f826a8..55f4d0ca2 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
#include <memory>
#include <string>
@@ -78,32 +78,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
return controller_map_->getControllerServiceNode(id);
}
- /**
- * Removes a controller service.
- * @param serviceNode controller service node.
- */
- virtual void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) {
- controller_map_->removeControllerService(serviceNode);
- }
-
- /**
- * Enables the provided controller service
- * @param serviceNode controller service node.
- */
- virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Enables the provided controller service nodes
- * @param serviceNode controller service node.
- */
- virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0;
-
- /**
- * Disables the provided controller service node
- * @param serviceNode controller service node.
- */
- virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
/**
* Removes all controller services.
*/
@@ -116,62 +90,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
return controller_map_->getAllControllerServices();
}
- /**
- * Verifies that referencing components can be stopped for the controller service
- */
- virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Unschedules referencing components.
- */
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Verifies referencing components for <code>serviceNode</code> can be disabled.
- * @param serviceNode shared pointer to a controller service node.
- */
- virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Disables referencing components for <code>serviceNode</code> can be disabled.
- * @param serviceNode shared pointer to a controller service node.
- */
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
- return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>();
- }
-
- /**
- * Verifies referencing components for <code>serviceNode</code> can be enabled.
- * @param serviceNode shared pointer to a controller service node.
- */
- virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- ref->canEnable();
- }
- }
-
- /**
- * Enables referencing components for <code>serviceNode</code> can be Enabled.
- * @param serviceNode shared pointer to a controller service node.
- */
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Schedules the service node and referencing components.
- * @param serviceNode shared pointer to a controller service node.
- */
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
-
- /**
- * Returns a controller service for the service identifier and componentID
- * @param service Identifier service identifier.
- */
- virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string& serviceIdentifier, const utils::Identifier& /*componentId*/) const {
- std::shared_ptr<ControllerService> node = getControllerService(serviceIdentifier);
- return node;
- }
-
/**
* Gets the controller service for the provided identifier
* @param identifier service identifier.
@@ -272,5 +190,3 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
};
} // namespace org::apache::nifi::minifi::core::controller
-
-#endif // LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_
diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
index 960307be5..d70d6d324 100644
--- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
@@ -25,12 +25,7 @@
#include "ControllerServiceProvider.h"
#include "ControllerServiceNode.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
class ForwardingControllerServiceProvider : public ControllerServiceProvider {
public:
@@ -44,22 +39,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
return controller_service_provider_impl_->getControllerServiceNode(id);
}
- void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->removeControllerService(serviceNode);
- }
-
- std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->enableControllerService(serviceNode);
- }
-
- void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override {
- return controller_service_provider_impl_->enableControllerServices(serviceNodes);
- }
-
- std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->disableControllerService(serviceNode);
- }
-
void clearControllerServices() override {
return controller_service_provider_impl_->clearControllerServices();
}
@@ -72,38 +51,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
return controller_service_provider_impl_->getAllControllerServices();
}
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode);
- }
-
- void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode);
- }
-
- void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode);
- }
-
- void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode);
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->disableReferencingServices(serviceNode);
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->enableReferencingServices(serviceNode);
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override {
- return controller_service_provider_impl_->scheduleReferencingComponents(serviceNode);
- }
-
- std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) const override {
- return controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier, componentId);
- }
-
bool isControllerServiceEnabled(const std::string &identifier) override {
return controller_service_provider_impl_->isControllerServiceEnabled(identifier);
}
@@ -128,9 +75,4 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_;
};
-} // namespace controller
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 92b12890c..8730133fd 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+
+#pragma once
#include <string>
#include <utility>
@@ -32,32 +32,16 @@
#include "StandardControllerServiceNode.h"
#include "ControllerServiceProvider.h"
#include "core/logging/LoggerFactory.h"
+#include "SchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace controller {
+namespace org::apache::nifi::minifi::core::controller {
class StandardControllerServiceProvider : public ControllerServiceProvider, public std::enable_shared_from_this<StandardControllerServiceProvider> {
public:
- explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration,
- std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader())
- : ControllerServiceProvider(services),
- agent_(agent),
- extension_loader_(loader),
- root_group_(root_group),
- configuration_(configuration),
- logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
- }
-
- explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration, ClassLoader &loader =
+ explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<Configure> configuration, ClassLoader &loader =
ClassLoader::getDefaultClassLoader())
: ControllerServiceProvider(services),
- agent_(nullptr),
extension_loader_(loader),
- root_group_(root_group),
configuration_(configuration),
logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) {
}
@@ -68,14 +52,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete;
StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete;
- void setRootGroup(ProcessGroup* rg) {
- root_group_ = rg;
- }
-
- void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) {
- agent_ = agent;
- }
-
std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool /*firstTimeAdded*/) {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
@@ -97,22 +73,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
return new_service_node;
}
- std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
- if (serviceNode->canEnable()) {
- return agent_->enableControllerService(serviceNode);
- } else {
- std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
- return no_run;
- }
- }
-
virtual void enableAllControllerServices() {
logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size());
for (auto service : controller_map_->getAllControllerServices()) {
- if (service->canEnable()) {
- logger_->log_info("Enabling %s", service->getName());
- agent_->enableControllerService(service);
- } else {
+ logger_->log_info("Enabling %s", service->getName());
+ if (!service->canEnable()) {
+ logger_->log_warn("Service %s cannot be enabled", service->getName());
+ continue;
+ }
+ if (!service->enable()) {
logger_->log_warn("Could not enable %s", service->getName());
}
}
@@ -121,98 +90,32 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
virtual void disableAllControllerServices() {
logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size());
for (auto service : controller_map_->getAllControllerServices()) {
+ logger_->log_info("Disabling %s", service->getName());
+ if (!service->enabled()) {
+ logger_->log_warn("Service %s is not enabled", service->getName());
+ continue;
+ }
if (!service->disable()) {
logger_->log_warn("Could not disable %s", service->getName());
}
}
}
- void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
- for (auto node : serviceNodes) {
- enableControllerService(node);
- }
- }
-
- std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
- if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
- return agent_->disableControllerService(serviceNode);
- } else {
- std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
- return no_run;
- }
- }
-
void clearControllerServices() {
controller_map_->clear();
}
- void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) {
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- agent_->disableControllerService(ref);
- }
- return references;
- }
-
- void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- if (!ref->canEnable()) {
- logger_->log_info("Cannot disable %s", ref->getName());
- }
- }
- }
-
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- agent_->disableControllerService(ref);
- }
-
- return references;
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- agent_->enableControllerService(ref);
- }
- return references;
- }
-
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode);
- for (auto ref : references) {
- agent_->enableControllerService(ref);
- }
- return references;
- }
-
protected:
bool canEdit() {
return false;
}
- std::shared_ptr<minifi::SchedulingAgent> agent_;
-
ClassLoader &extension_loader_;
- ProcessGroup* root_group_ = nullptr;
-
std::shared_ptr<Configure> configuration_;
private:
std::shared_ptr<logging::Logger> logger_;
};
-} // namespace controller
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_
+} // namespace org::apache::nifi::minifi::core::controller
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 109d5c490..e3546da2a 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -32,7 +32,7 @@ namespace org::apache::nifi::minifi::state {
*/
class ProcessorController : public StateController {
public:
- ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler);
+ ProcessorController(core::Processor& processor, SchedulingAgent& scheduler);
~ProcessorController() override;
@@ -64,7 +64,7 @@ class ProcessorController : public StateController {
protected:
gsl::not_null<core::Processor*> processor_;
- std::shared_ptr<SchedulingAgent> scheduler_;
+ gsl::not_null<SchedulingAgent*> scheduler_;
};
} // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 37aa27fdd..68c2663ec 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -40,6 +40,7 @@
#include "controllers/ThreadManagementService.h"
#include "core/controller/ControllerService.h"
#include "core/controller/ControllerServiceProvider.h"
+
namespace org::apache::nifi::minifi::utils {
using TaskId = std::string;
@@ -162,18 +163,8 @@ class WorkerThread {
template<typename T>
class ThreadPool {
public:
- ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, core::controller::ControllerServiceProvider* controller_service_provider = nullptr,
- std::string name = "NamelessPool")
- : daemon_threads_(daemon_threads),
- thread_reduction_count_(0),
- max_worker_threads_(max_worker_threads),
- adjust_threads_(false),
- running_(false),
- controller_service_provider_(controller_service_provider),
- name_(std::move(name)) {
- current_workers_ = 0;
- thread_manager_ = nullptr;
- }
+ ThreadPool(int max_worker_threads = 2, bool daemon_threads = false,
+ core::controller::ControllerServiceProvider* controller_service_provider = nullptr, std::string name = "NamelessPool");
ThreadPool(const ThreadPool<T> &other) = delete;
ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
@@ -277,6 +268,9 @@ class ThreadPool {
start();
}
+ private:
+ std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const;
+
protected:
std::thread createThread(std::function<void()> &&functor) {
return std::thread([ functor ]() mutable {
@@ -296,6 +290,7 @@ class ThreadPool {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
+
// determines if threads are detached
bool daemon_threads_;
std::atomic<int> thread_reduction_count_;
@@ -337,6 +332,8 @@ class ThreadPool {
// variable to signal task running completion
std::condition_variable task_run_complete_;
+ std::shared_ptr<core::logging::Logger> logger_;
+
/**
* Call for the manager to start worker threads
*/
@@ -345,7 +342,7 @@ class ThreadPool {
/**
* Runs worker tasks
*/
- void run_tasks(std::shared_ptr<WorkerThread> thread);
+ void run_tasks(const std::shared_ptr<WorkerThread>& thread);
void manage_delayed_queue();
};
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 87ea88853..9d0d62e15 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -182,7 +182,7 @@ int16_t FlowController::stop() {
logger_->log_info("Stop Flow Controller");
if (this->root_) {
// stop source processors first
- this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const core::Processor* proc) -> bool {
+ this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_, [] (const core::Processor* proc) -> bool {
return !proc->hasIncomingConnections();
});
// we enable C2 to progressively increase the timeout
@@ -194,7 +194,7 @@ int16_t FlowController::stop() {
std::this_thread::sleep_for(shutdown_check_interval_);
}
// shutdown all other processors as well
- this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+ this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
}
// stop after we've attempted to stop the processors.
timer_scheduler_->stop();
@@ -211,7 +211,7 @@ int16_t FlowController::stop() {
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
// stop the ControllerServices
- this->controller_service_provider_impl_->disableAllControllerServices();
+ disableAllControllerServices();
running_ = false;
}
return 0;
@@ -324,10 +324,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload);
conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload);
- std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_.get());
- std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent(
- std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
-
logger_->log_info("Loaded controller service provider");
/*
@@ -374,16 +370,16 @@ int16_t FlowController::start() {
} else {
if (!running_) {
logger_->log_info("Starting Flow Controller");
- controller_service_provider_impl_->enableAllControllerServices();
- this->timer_scheduler_->start();
- this->event_scheduler_->start();
- this->cron_scheduler_->start();
+ enableAllControllerServices();
+ timer_scheduler_->start();
+ event_scheduler_->start();
+ cron_scheduler_->start();
if (this->root_ != nullptr) {
start_time_ = std::chrono::steady_clock::now();
// watch out, this might immediately start the processors
// as the thread_pool_ is started in load()
- this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
+ this->root_->startProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
}
C2Client::initialize(this, this, this);
core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
@@ -391,7 +387,6 @@ int16_t FlowController::start() {
this->protocol_->start();
this->provenance_repo_->start();
this->flow_file_repo_->start();
- thread_pool_.start();
logger_->log_info("Started Flow Controller");
}
return 0;
@@ -504,11 +499,11 @@ state::StateController* FlowController::getComponent(const std::string& id_or_na
}
gsl::not_null<std::unique_ptr<state::ProcessorController>> FlowController::createController(core::Processor& processor) {
- const auto scheduler = [this, &processor]() -> std::shared_ptr<SchedulingAgent> {
+ const auto scheduler = [this, &processor]() -> SchedulingAgent& {
switch (processor.getSchedulingStrategy()) {
- case core::SchedulingStrategy::TIMER_DRIVEN: return timer_scheduler_;
- case core::SchedulingStrategy::EVENT_DRIVEN: return event_scheduler_;
- case core::SchedulingStrategy::CRON_DRIVEN: return cron_scheduler_;
+ case core::SchedulingStrategy::TIMER_DRIVEN: return *timer_scheduler_;
+ case core::SchedulingStrategy::EVENT_DRIVEN: return *event_scheduler_;
+ case core::SchedulingStrategy::CRON_DRIVEN: return *cron_scheduler_;
}
gsl_Assert(false);
};
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 5b59faa31..9dfca5c58 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -34,47 +34,6 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
namespace org::apache::nifi::minifi {
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
- // reference the enable function from serviceNode
- std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
- serviceNode->enable();
- return utils::TaskRescheduleInfo::Done();
- };
-
- // only need to run this once.
- auto monitor = std::make_unique<utils::ComplexMonitor>();
- utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
- // move the functor into the thread pool. While a future is returned
- // we aren't terribly concerned with the result.
- std::future<utils::TaskRescheduleInfo> future;
- thread_pool_.execute(std::move(functor), future);
- if (future.valid())
- future.wait();
- return future;
-}
-
-std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
- // reference the disable function from serviceNode
- std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
- serviceNode->disable();
- return utils::TaskRescheduleInfo::Done();
- };
-
- // only need to run this once.
- auto monitor = std::make_unique<utils::ComplexMonitor>();
- utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
-
- // move the functor into the thread pool. While a future is returned
- // we aren't terribly concerned with the result.
- std::future<utils::TaskRescheduleInfo> future;
- thread_pool_.execute(std::move(functor), future);
- if (future.valid())
- future.wait();
- return future;
-}
-
bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
gsl_Expects(processor);
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index c7981b86f..b53d3fe90 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -37,7 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
filesystem_(std::move(ctx.filesystem)),
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
- service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+ service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
std::string flowUrl;
std::string bucket_id = "default";
std::string flowId;
@@ -97,7 +97,7 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s
auto old_services = controller_services_;
auto old_provider = service_provider_;
controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
- service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+ service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
auto payload = getRootFromPayload(yamlConfigPayload);
if (!url.empty() && payload != nullptr) {
std::string payload_flow_id;
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7922717fb..7300d2129 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -122,8 +122,8 @@ void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
}
}
-void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
- const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler,
+ EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::set<Processor*> failed_processors;
@@ -133,13 +133,13 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
logger_->log_debug("Starting %s", processor->getName());
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
- timeScheduler->schedule(processor);
+ timeScheduler.schedule(processor);
break;
case EVENT_DRIVEN:
- eventScheduler->schedule(processor);
+ eventScheduler.schedule(processor);
break;
case CRON_DRIVEN:
- cronScheduler->schedule(processor);
+ cronScheduler.schedule(processor);
break;
}
}
@@ -166,8 +166,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) {
logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load());
- auto func = [this, eventScheduler, cronScheduler, timeScheduler]() {
- this->startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
+ auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() {
+ this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler);
};
onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_), func);
onScheduleTimer_->start();
@@ -176,8 +176,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc
}
}
-void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
- const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+ CronDrivenSchedulingAgent& cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
@@ -202,8 +202,8 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg
}
}
-void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
- const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const Processor*)>& filter) {
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
+ CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (onScheduleTimer_) {
@@ -221,13 +221,13 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
logger_->log_debug("Stopping %s", processor->getName());
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
- timeScheduler->unschedule(processor.get());
+ timeScheduler.unschedule(processor.get());
break;
case EVENT_DRIVEN:
- eventScheduler->unschedule(processor.get());
+ eventScheduler.unschedule(processor.get());
break;
case CRON_DRIVEN:
- cronScheduler->unschedule(processor.get());
+ cronScheduler.unschedule(processor.get());
break;
}
}
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index c5686de77..b496387d9 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -22,9 +22,9 @@
namespace org::apache::nifi::minifi::state {
-ProcessorController::ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler)
+ProcessorController::ProcessorController(core::Processor& processor, SchedulingAgent& scheduler)
: processor_(&processor),
- scheduler_(std::move(scheduler)) {
+ scheduler_(&scheduler) {
}
ProcessorController::~ProcessorController() = default;
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 8612f0dbf..baadb59ec 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -21,7 +21,21 @@
namespace org::apache::nifi::minifi::utils {
template<typename T>
-void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+ThreadPool<T>::ThreadPool(int max_worker_threads, bool daemon_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name)
+ : daemon_threads_(daemon_threads),
+ thread_reduction_count_(0),
+ max_worker_threads_(max_worker_threads),
+ adjust_threads_(false),
+ running_(false),
+ controller_service_provider_(controller_service_provider),
+ name_(std::move(name)),
+ logger_(core::logging::LoggerFactory<ThreadPool<T>>::getLogger()) {
+ current_workers_ = 0;
+ thread_manager_ = nullptr;
+}
+
+template<typename T>
+void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) {
thread->is_running_ = true;
while (running_.load()) {
if (UNLIKELY(thread_reduction_count_ > 0)) {
@@ -181,16 +195,29 @@ void ThreadPool<T>::manageWorkers() {
}
template<typename T>
-void ThreadPool<T>::start() {
- if (nullptr != controller_service_provider_) {
- auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
- thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
- } else {
- thread_manager_ = nullptr;
+std::shared_ptr<controllers::ThreadManagementService> ThreadPool<T>::createThreadManager() const {
+ if (!controller_service_provider_) {
+ return nullptr;
+ }
+ auto service = controller_service_provider_->getControllerService("ThreadPoolManager");
+ if (!service) {
+ logger_->log_info("Could not find a ThreadPoolManager service");
+ return nullptr;
}
+ auto thread_manager_service = std::dynamic_pointer_cast<controllers::ThreadManagementService>(service);
+ if (!thread_manager_service) {
+ logger_->log_error("Found ThreadPoolManager, but it is not a ThreadManagementService");
+ return nullptr;
+ }
+ return thread_manager_service;
+}
+template<typename T>
+void ThreadPool<T>::start() {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
if (!running_) {
+ thread_manager_ = createThreadManager();
+
running_ = true;
worker_queue_.start();
manager_thread_ = std::thread(&ThreadPool::manageWorkers, this);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index b5c48dc1d..bc34d239e 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -197,7 +197,7 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo
logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
- controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+ controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration_);
/* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */
if (state_dir == nullptr) {
state_dir_ = std::make_unique<TempDirectory>();
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp
index b80c7563c..36a74046e 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -56,7 +56,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
auto test_plan = testController.createPlan();
auto controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
auto configuration = std::make_shared<minifi::Configure>();
- auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
+ auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration);
utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool;
auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
count_proc->incrementActiveTasks();