You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2018/01/18 16:54:16 UTC
nifi-minifi-cpp git commit: MINIFICPP-374: Commit Linux power
management service that enables the threadpools to reduce the number of
threads and throttle active tasks using linux powr constructs.
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 7f252d2a9 -> 254877fa5
MINIFICPP-374: Commit Linux power management service that enables the threadpools to reduce the number of threads
and throttle active tasks using linux powr constructs.
MINIFICPP-374: Add readme entry
This closes #242.
Signed-off-by: Bin Qiu <be...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/254877fa
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/254877fa
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/254877fa
Branch: refs/heads/master
Commit: 254877fa51bb983b94c07e68badc6d952b47bdb7
Parents: 7f252d2
Author: Marc Parisi <ph...@apache.org>
Authored: Sat Jan 13 12:19:11 2018 -0500
Committer: Bin Qiu <be...@gmail.com>
Committed: Thu Jan 18 08:53:24 2018 -0800
----------------------------------------------------------------------
README.md | 15 ++
libminifi/include/FlowController.h | 6 +-
libminifi/include/SchedulingAgent.h | 77 ++++++-
libminifi/include/ThreadedSchedulingAgent.h | 46 -----
.../controllers/LinuxPowerManagementService.h | 147 ++++++++++++++
.../controllers/ThreadManagementService.h | 128 ++++++++++++
.../core/controller/ControllerServiceProvider.h | 4 +-
.../StandardControllerServiceProvider.h | 19 +-
.../include/sitetosite/SiteToSiteFactory.h | 3 +-
libminifi/include/utils/ThreadPool.h | 175 +++++++++++++---
libminifi/src/FlowController.cpp | 27 ++-
libminifi/src/SchedulingAgent.cpp | 35 ++--
libminifi/src/capi/api.cpp | 1 -
.../controllers/LinuxPowerManagementService.cpp | 202 +++++++++++++++++++
libminifi/src/utils/ByteArrayCallback.cpp | 2 -
main/CMakeLists.txt | 8 +-
16 files changed, 773 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1682017..2c6fd41 100644
--- a/README.md
+++ b/README.md
@@ -609,6 +609,21 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned
class: ControllerServiceClass
Properties:
+### Linux Power Manager Controller Service
+ The linux power manager controller service can be configured to monitor the battery level and status ( discharging or charging ) via the following configuration.
+ Simply provide the capacity path and status path along with your threshold for the trigger and low battery alarm and you can monitor your battery and throttle
+ the threadpools within MiNiFi C++. Note that the name is identified must be ThreadPoolManager.
+
+ Controller Services:
+ - name: ThreadPoolManager
+ id: 2438e3c8-015a-1000-79ca-83af40ec1888
+ class: LinuxPowerManagerService
+ Properties:
+ Battery Capacity Path: /path/to/battery/capacity
+ Battery Status Path: /path/to/battery/status
+ Trigger Threshold: 90
+ Low Battery Threshold: 50
+ Wait Period: 500 ms
### Running
After completing a [build](#building), the application can be run by issuing the following from :
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 6957c4e..2087f81 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -217,7 +217,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Enables controller services
@@ -229,13 +229,15 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Gets all controller services.
*/
virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices();
+ virtual std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier);
+
/**
* Gets controller service node specified by <code>id</code>
* @param id service identifier
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 71aceb8..682f6ec 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -44,6 +44,67 @@ namespace apache {
namespace nifi {
namespace minifi {
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
+ public:
+ TimerAwareMonitor(std::atomic<bool> *run_monitor)
+ : current_wait_(0),
+ run_monitor_(run_monitor) {
+ }
+ explicit TimerAwareMonitor(TimerAwareMonitor &&other)
+ : AfterExecute(std::move(other)),
+ run_monitor_(std::move(other.run_monitor_)) {
+ current_wait_.store(other.current_wait_.load());
+ }
+ virtual bool isFinished(const uint64_t &result) {
+ current_wait_.store(result);
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ virtual bool isCancelled(const uint64_t &result) {
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual int64_t wait_time() {
+ return current_wait_.load();
+ }
+ protected:
+
+ std::atomic<uint64_t> current_wait_;
+ std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public TimerAwareMonitor {
+ public:
+ SingleRunMonitor(std::atomic<bool> *run_monitor)
+ : TimerAwareMonitor(run_monitor) {
+ }
+ explicit SingleRunMonitor(TimerAwareMonitor &&other)
+ : TimerAwareMonitor(std::move(other)){
+ }
+ virtual bool isFinished(const uint64_t &result) {
+ if (result == 0) {
+ return true;
+ } else {
+ current_wait_.store(result);
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ }
+};
+
// SchedulingAgent Class
class SchedulingAgent {
public:
@@ -62,9 +123,9 @@ class SchedulingAgent {
running_ = false;
repo_ = repo;
flow_repo_ = flow_repo;
- utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
- component_lifecycle_thread_pool_ = std::move(pool);
- component_lifecycle_thread_pool_.start();
+ auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true, controller_service_provider);
+ thread_pool_ = std::move(pool);
+ thread_pool_.start();
}
// Destructor
virtual ~SchedulingAgent() {
@@ -79,17 +140,17 @@ class SchedulingAgent {
// start
void start() {
running_ = true;
- component_lifecycle_thread_pool_.start();
+ thread_pool_.start();
}
// stop
virtual void stop() {
running_ = false;
- component_lifecycle_thread_pool_.shutdown();
+ thread_pool_.shutdown();
}
public:
- virtual std::future<bool> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
- virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
// schedule, overwritten by different DrivenSchedulingAgent
virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
// unschedule, overwritten by different DrivenSchedulingAgent
@@ -115,7 +176,7 @@ class SchedulingAgent {
std::shared_ptr<core::ContentRepository> content_repo_;
// thread pool for components.
- utils::ThreadPool<bool> component_lifecycle_thread_pool_;
+ utils::ThreadPool<uint64_t> thread_pool_;
// controller service provider reference
std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index b01d740..ba0998a 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -33,45 +33,6 @@ namespace apache {
namespace nifi {
namespace minifi {
-/**
- * Uses the wait time for a given worker to determine if it is eligible to run
- */
-class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
- public:
- TimerAwareMonitor(std::atomic<bool> *run_monitor)
- : current_wait_(0),
- run_monitor_(run_monitor) {
- }
- explicit TimerAwareMonitor(TimerAwareMonitor &&other)
- : AfterExecute(std::move(other)),
- run_monitor_(std::move(other.run_monitor_)) {
- current_wait_.store(other.current_wait_.load());
- }
- virtual bool isFinished(const uint64_t &result) {
- current_wait_.store(result);
- if (*run_monitor_) {
- return false;
- }
- return true;
- }
- virtual bool isCancelled(const uint64_t &result) {
- if (*run_monitor_) {
- return false;
- }
- return true;
- }
- /**
- * Time to wait before re-running this task if necessary
- * @return milliseconds since epoch after which we are eligible to re-run this task.
- */
- virtual int64_t wait_time() {
- return current_wait_.load();
- }
- private:
-
- std::atomic<uint64_t> current_wait_;
- std::atomic<bool> *run_monitor_;
-};
/**
* An abstract scheduling agent which creates and manages a pool of threads for
@@ -87,11 +48,6 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
-
- utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true);
- thread_pool_ = std::move(pool);
- thread_pool_.start();
-
}
// Destructor
virtual ~ThreadedSchedulingAgent() {
@@ -108,8 +64,6 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
virtual void unschedule(std::shared_ptr<core::Processor> processor);
virtual void stop();
- protected:
- utils::ThreadPool<uint64_t> thread_pool_;
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/controllers/LinuxPowerManagementService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/LinuxPowerManagementService.h b/libminifi/include/controllers/LinuxPowerManagementService.h
new file mode 100644
index 0000000..2b8bf0d
--- /dev/null
+++ b/libminifi/include/controllers/LinuxPowerManagementService.h
@@ -0,0 +1,147 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_
+
+#include <iostream>
+#include <memory>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "ThreadManagementService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+/**
+ * Purpose: Linux power management service uses a path for the battery level
+ * and the status ( charging/discharging )
+ */
+class LinuxPowerManagerService : public ThreadManagementService {
+ public:
+ explicit LinuxPowerManagerService(const std::string &name, const std::string &id)
+ : ThreadManagementService(name, id),
+ enabled_(false),
+ battery_level_(0),
+ wait_period_(0),
+ last_time_(0),
+ trigger_(0),
+ low_battery_trigger_(0),
+ logger_(logging::LoggerFactory<LinuxPowerManagerService>::getLogger()) {
+ }
+
+ explicit LinuxPowerManagerService(const std::string &name, uuid_t uuid = 0)
+ : ThreadManagementService(name, uuid),
+ enabled_(false),
+ battery_level_(0),
+ wait_period_(0),
+ last_time_(0),
+ trigger_(0),
+ low_battery_trigger_(0),
+ logger_(logging::LoggerFactory<LinuxPowerManagerService>::getLogger()) {
+ }
+
+ explicit LinuxPowerManagerService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+ : LinuxPowerManagerService(name, nullptr) {
+ setConfiguration(configuration);
+ initialize();
+ }
+
+ static core::Property BatteryCapacityPath;
+ static core::Property BatteryStatusPath;
+ static core::Property BatteryStatusDischargeKeyword;
+ static core::Property TriggerThreshold;
+ static core::Property LowBatteryThreshold;
+ static core::Property WaitPeriod;
+
+ /**
+ * Helps to determine if the number of tasks will increase the pools above their threshold.
+ * @param new_tasks tasks to be added.
+ * @return true if above max, false otherwise.
+ */
+ virtual bool isAboveMax(const int new_tasks);
+
+ /**
+ * Returns the max number of threads allowed by all pools
+ * @return max threads.
+ */
+ virtual uint16_t getMaxThreads();
+
+ /**
+ * Function based on cooperative multitasking that will tell a caller whether or not the number of threads should be reduced.
+ * @return true if threading impacts QOS.
+ */
+ virtual bool shouldReduce();
+
+ /**
+ * Function to indicate to this controller service that we've reduced threads in a threadpool
+ */
+ virtual void reduce();
+
+ /**
+ * Function to help callers identify if they can increase threads.
+ * @return true if QOS won't be breached.
+ */
+ virtual bool canIncrease();
+
+ void initialize();
+
+ void yield();
+
+ bool isRunning();
+
+ bool isWorkAvailable();
+
+ virtual void onEnable();
+
+ protected:
+
+ std::vector<std::pair<std::string, std::string>> paths_;
+
+ bool enabled_;
+
+ std::atomic<int> battery_level_;
+
+ std::atomic<uint64_t> wait_period_;
+
+ std::atomic<uint64_t> last_time_;
+
+ int trigger_;
+
+ int low_battery_trigger_;
+
+ std::string status_keyword_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(LinuxPowerManagerService);
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_LINUXPOWERMANAGEMENTSERVICE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/controllers/ThreadManagementService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/ThreadManagementService.h b/libminifi/include/controllers/ThreadManagementService.h
new file mode 100644
index 0000000..b297834
--- /dev/null
+++ b/libminifi/include/controllers/ThreadManagementService.h
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_
+#include <iostream>
+#include <memory>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+/**
+ * Purpose: Thread management service provides a contextual awareness across
+ * thread pools that enables us to deliver QOS to an agent.
+ */
+class ThreadManagementService : public core::controller::ControllerService {
+ public:
+ explicit ThreadManagementService(const std::string &name, const std::string &id)
+ : ControllerService(name, id),
+ logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+ }
+
+ explicit ThreadManagementService(const std::string &name, uuid_t uuid = 0)
+ : ControllerService(name, uuid),
+ logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+ }
+
+ explicit ThreadManagementService(const std::string &name, const std::shared_ptr<Configure> &configuration)
+ : ControllerService(name, nullptr),
+ logger_(logging::LoggerFactory<ThreadManagementService>::getLogger()) {
+
+ }
+
+ /**
+ * Helps to determine if the number of tasks will increase the pools above their threshold.
+ * @param new_tasks tasks to be added.
+ * @return true if above max, false otherwise.
+ */
+ virtual bool isAboveMax(const int new_tasks) = 0;
+
+ /**
+ * Returns the max number of threads allowed by all pools
+ * @return max threads.
+ */
+ virtual uint16_t getMaxThreads() = 0;
+
+ /**
+ * Function based on cooperative multitasking that will tell a caller whether or not the number of threads should be reduced.
+ * @return true if threading impacts QOS.
+ */
+ virtual bool shouldReduce() = 0;
+
+ /**
+ * Function to indicate to this controller service that we've reduced threads in a threadpool
+ */
+ virtual void reduce() = 0;
+
+ /**
+ * Registration function to tabulate total threads.
+ * @param threads threads from a thread pool.
+ */
+ virtual void registerThreadCount(const int threads) {
+ thread_count_ += threads;
+ }
+
+ /**
+ * Function to help callers identify if they can increase threads.
+ * @return true if QOS won't be breached.
+ */
+ virtual bool canIncrease() = 0;
+
+ virtual void initialize() {
+ ControllerService::initialize();
+ }
+
+ void yield() {
+
+ }
+
+ bool isRunning() {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+ }
+
+ bool isWorkAvailable() {
+ return false;
+ }
+
+ virtual void onEnable() {
+
+ }
+
+ protected:
+
+ std::atomic<int> thread_count_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_THREADMANAGEMENTSERVICE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/core/controller/ControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index bf02080..0499d35 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -96,7 +96,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
* Enables the provided controller service
* @param serviceNode controller service node.
*/
- virtual std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+ virtual std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
/**
* Enables the provided controller service nodes
@@ -108,7 +108,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
* Disables the provided controller service node
* @param serviceNode controller service node.
*/
- virtual std::future<bool> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+ virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
/**
* Gets a list of all controller services.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index ff75488..a6ca684 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -79,8 +79,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
agent_ = agent;
}
- std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id,
- bool firstTimeAdded) {
+ std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded) {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);
@@ -97,11 +96,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
- std::future<bool> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+ std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
if (serviceNode->canEnable()) {
return agent_->enableControllerService(serviceNode);
} else {
- std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+
+ std::future<uint64_t> no_run = std::async(std::launch::async, []() {
+ uint64_t ret = 0;
+ return ret;
+ });
return no_run;
}
}
@@ -125,11 +128,14 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
}
- std::future<bool> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+ std::future<uint64_t> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
return agent_->disableControllerService(serviceNode);
} else {
- std::future<bool> no_run = std::async(std::launch::async, []() {return false;});
+ std::future<uint64_t> no_run = std::async(std::launch::async, []() {
+ uint64_t ret = 0;
+ return ret;
+ });
return no_run;
}
}
@@ -193,7 +199,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
std::shared_ptr<Configure> configuration_;
-
private:
std::shared_ptr<logging::Logger> logger_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 648347d..35e12b9 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -72,8 +72,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol));
auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort()));
char idStr[37];
- uuid_unparse_lower(uuid, idStr);
- std::cout << "sending " << idStr << std::endl;
+ uuid_unparse_lower(uuid, idStr);
ptr->setPortId(uuid);
ptr->setPeer(std::move(peer));
return ptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 4f80829..f04e319 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -26,7 +26,13 @@
#include <queue>
#include <future>
#include <thread>
+#include <functional>
+
+#include "capi/expect.h"
+#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
+#include "core/controller/ControllerService.h"
+#include "core/controller/ControllerServiceProvider.h"
namespace org {
namespace apache {
namespace nifi {
@@ -181,6 +187,21 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
return promise;
}
+class WorkerThread {
+ public:
+ explicit WorkerThread(std::thread thread)
+ : is_running_(false),
+ thread_(std::move(thread)) {
+
+ }
+ WorkerThread()
+ : is_running_(false) {
+
+ }
+ std::atomic<bool> is_running_;
+ std::thread thread_;
+};
+
/**
* Thread pool
* Purpose: Provides a thread pool with basic functionality similar to
@@ -191,20 +212,29 @@ template<typename T>
class ThreadPool {
public:
- ThreadPool(int max_worker_threads = 2, bool daemon_threads = false)
+ ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr)
: daemon_threads_(daemon_threads),
+ thread_reduction_count_(0),
max_worker_threads_(max_worker_threads),
- running_(false) {
+ adjust_threads_(false),
+ running_(false),
+ controller_service_provider_(controller_service_provider) {
current_workers_ = 0;
+ thread_manager_ = nullptr;
}
ThreadPool(const ThreadPool<T> &&other)
: daemon_threads_(std::move(other.daemon_threads_)),
+ thread_reduction_count_(0),
max_worker_threads_(std::move(other.max_worker_threads_)),
- running_(false) {
+ adjust_threads_(false),
+ running_(false),
+ controller_service_provider_(std::move(other.controller_service_provider_)),
+ thread_manager_(std::move(other.thread_manager_)) {
current_workers_ = 0;
}
- virtual ~ThreadPool() {
+
+ ~ThreadPool() {
shutdown();
}
@@ -270,10 +300,16 @@ class ThreadPool {
max_worker_threads_ = std::move(other.max_worker_threads_);
daemon_threads_ = std::move(other.daemon_threads_);
current_workers_ = 0;
+ thread_reduction_count_ = 0;
thread_queue_ = std::move(other.thread_queue_);
worker_queue_ = std::move(other.worker_queue_);
+ controller_service_provider_ = std::move(other.controller_service_provider_);
+ thread_manager_ = std::move(other.thread_manager_);
+
+ adjust_threads_ = false;
+
if (!running_) {
start();
}
@@ -282,6 +318,12 @@ class ThreadPool {
protected:
+ std::thread createThread(std::function<void()> &&functor) {
+ return std::thread([ functor ]() mutable {
+ functor();
+ });
+ }
+
/**
* Drain will notify tasks to stop following notification
*/
@@ -292,37 +334,51 @@ class ThreadPool {
}
// determines if threads are detached
bool daemon_threads_;
- // max worker threads
+ std::atomic<int> thread_reduction_count_;
+// max worker threads
int max_worker_threads_;
- // current worker tasks.
+// current worker tasks.
std::atomic<int> current_workers_;
- // thread queue
- std::vector<std::thread> thread_queue_;
- // manager thread
+// thread queue
+ std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
+// manager thread
std::thread manager_thread_;
- // atomic running boolean
+// conditional that's used to adjust the threads
+ std::atomic<bool> adjust_threads_;
+// atomic running boolean
std::atomic<bool> running_;
- // worker queue of worker objects
+// controller service provider
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
+// integrated power manager
+ std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
+ // thread queue for the recently deceased threads.
+ moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+// worker queue of worker objects
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
- // notification for available work
+// notification for available work
std::condition_variable tasks_available_;
- // map to identify if a task should be
+// map to identify if a task should be
std::map<std::string, bool> task_status_;
- // manager mutex
+// manager mutex
std::recursive_mutex manager_mutex_;
- // work queue mutex
+// work queue mutex
std::mutex worker_queue_mutex_;
/**
* Call for the manager to start worker threads
*/
- void startWorkers();
+ void manageWorkers();
+
+ /**
+ * Function to adjust the workers up and down.
+ */
+ void adjustWorkers(int count);
/**
* Runs worker tasks
*/
- void run_tasks();
+ void run_tasks(std::shared_ptr<WorkerThread> thread);
};
template<typename T>
@@ -341,32 +397,79 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
}
template<typename T>
-void ThreadPool<T>::startWorkers() {
+void ThreadPool<T>::manageWorkers() {
for (int i = 0; i < max_worker_threads_; i++) {
- thread_queue_.push_back(std::move(std::thread(&ThreadPool::run_tasks, this)));
+ auto worker_thread = std::make_shared<WorkerThread>();
+ worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ thread_queue_.push_back(worker_thread);
current_workers_++;
}
if (daemon_threads_) {
for (auto &thread : thread_queue_) {
- thread.detach();
+ thread->thread_.detach();
}
}
- for (auto &thread : thread_queue_) {
- if (thread.joinable())
- thread.join();
+
+// likely don't have a thread manager
+ if (LIKELY(nullptr != thread_manager_)) {
+ while (running_) {
+ auto waitperiod = std::chrono::milliseconds(1) * 500;
+ {
+ if (thread_manager_->isAboveMax(current_workers_)) {
+ auto max = thread_manager_->getMaxConcurrentTasks();
+ auto differential = current_workers_ - max;
+ thread_reduction_count_ += differential;
+ } else if (thread_manager_->shouldReduce()) {
+ if (current_workers_ > 1)
+ thread_reduction_count_++;
+ thread_manager_->reduce();
+ } else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) { // increase slowly
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ auto worker_thread = std::make_shared<WorkerThread>();
+ worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ thread_queue_.push_back(worker_thread);
+ current_workers_++;
+ }
+ }
+ {
+ std::shared_ptr<WorkerThread> thread_ref;
+ while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (thread_ref->thread_.joinable())
+ thread_ref->thread_.join();
+ thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+ }
+ }
+ std::this_thread::sleep_for(waitperiod);
+ }
+ } else {
+ for (auto &thread : thread_queue_) {
+ if (thread->thread_.joinable())
+ thread->thread_.join();
+ }
}
}
template<typename T>
-void ThreadPool<T>::run_tasks() {
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
auto waitperiod = std::chrono::milliseconds(1) * 100;
uint64_t wait_decay_ = 0;
+ uint64_t yield_backoff = 10; // start at 10 ms
while (running_.load()) {
+ if (UNLIKELY(thread_reduction_count_ > 0)) {
+ if (--thread_reduction_count_ >= 0) {
+ deceased_thread_queue_.enqueue(thread);
+ thread->is_running_ = false;
+ break;
+ } else {
+ thread_reduction_count_++;
+ }
+ }
// if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
// they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
// BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
- // there. This ensures we don't have arbitrarily long sleep cycles.
+ // there. This ensures we don't have arbitrarily long sleep cycles.
if (wait_decay_ > 500000000L) {
wait_decay_ = 100000000L;
}
@@ -378,6 +481,17 @@ void ThreadPool<T>::run_tasks() {
if (wait_decay_ > 2000) {
std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
}
+
+ if (current_workers_ < max_worker_threads_) {
+ // we are in a reduced state. due to thread management
+ // let's institute a backoff up to 500ms
+ if (yield_backoff < 500) {
+ yield_backoff += 10;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff));
+ } else {
+ yield_backoff = 10;
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
@@ -407,7 +521,7 @@ void ThreadPool<T>::run_tasks() {
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
// if our differential is < 10% of the wait time we will not put the task into a wait state
// since requeuing will break the time slice contract.
- if ((double)task.getTimeSlice() > ms && ((double)(task.getTimeSlice() - ms)) > (wt * .10)) {
+ if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
wait_to_run = true;
}
}
@@ -442,14 +556,19 @@ void ThreadPool<T>::run_tasks() {
}
}
current_workers_--;
-
}
template<typename T>
void ThreadPool<T>::start() {
+ if (nullptr != controller_service_provider_) {
+ auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+ thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+ } else {
+ thread_manager_ = nullptr;
+ }
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
if (!running_) {
running_ = true;
- manager_thread_ = std::move(std::thread(&ThreadPool::startWorkers, this));
+ manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
if (worker_queue_.size_approx() > 0) {
tasks_available_.notify_all();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 03452b6..4d2e1b7 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -252,8 +252,16 @@ void FlowController::load() {
stop(true);
}
if (!initialized_) {
+ logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
+
+ this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
+
+ logger_->log_info("Loaded root processor Group");
+
logger_->log_info("Initializing timers");
+ controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
+
if (nullptr == timer_scheduler_) {
timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
@@ -264,13 +272,6 @@ void FlowController::load() {
std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
configuration_);
}
- logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
-
- this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
-
- logger_->log_info("Loaded root processor Group");
-
- controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
@@ -496,7 +497,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-std::future<bool> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->enableControllerService(serviceNode);
}
@@ -511,7 +512,7 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-std::future<bool> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->disableControllerService(serviceNode);
}
@@ -523,6 +524,14 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro
}
/**
+ * Gets the controller service for <code>identifier</code>
+ * @param identifier service identifier
+ * @return shared pointer to teh controller service implementation or nullptr if it does not exist.
+ */
+std::shared_ptr<core::controller::ControllerService> FlowController::getControllerService(const std::string &identifier) {
+ return controller_service_provider_->getControllerService(identifier);
+}
+/**
* Gets controller service node specified by <code>id</code>
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does not exist.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 2451a08..ca30316 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -39,34 +39,41 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
return false;
}
-std::future<bool> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
// reference the enable function from serviceNode
- std::function< bool()> f_ex = [serviceNode] {
- return serviceNode->enable();
+ std::function< uint64_t()> f_ex = [serviceNode] {
+ serviceNode->enable();
+ return 0;
};
- // create a functor that will be submitted to the thread pool.
- utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
+
+ // only need to run this once.
+ std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
+ utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
- std::future<bool> future;
- component_lifecycle_thread_pool_.execute(std::move(functor), future);
+ std::future<uint64_t> future;
+ thread_pool_.execute(std::move(functor), future);
future.wait();
return future;
}
-std::future<bool> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<uint64_t> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
// reference the disable function from serviceNode
- std::function< bool()> f_ex = [serviceNode] {
- return serviceNode->disable();
+ std::function< uint64_t()> f_ex = [serviceNode] {
+ serviceNode->disable();
+ return 0;
};
- // create a functor that will be submitted to the thread pool.
- utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
+
+ // only need to run this once.
+ std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
+ utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
- std::future<bool> future;
- component_lifecycle_thread_pool_.execute(std::move(functor), future);
+ std::future<uint64_t> future;
+ thread_pool_.execute(std::move(functor), future);
future.wait();
return future;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 0fffcb8..5e8f3d8 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -231,7 +231,6 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
claim->increaseFlowFileRecordOwnedCount();
auto path = claim->getContentFullPath();
auto ffr = create_flowfile(path.c_str());
- std::cout << "dang created " << path << " " << ff->getSize() << std::endl;
return ffr;
} else {
return nullptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/controllers/LinuxPowerManagementService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp
new file mode 100644
index 0000000..836c9d3
--- /dev/null
+++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "controllers/LinuxPowerManagementService.h"
+#include <utility>
+#include <limits>
+#include <string>
+#include <vector>
+#include <set>
+#include "utils/StringUtils.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property LinuxPowerManagerService::BatteryCapacityPath("Battery Capacity Path", "Path to the battery level");
+core::Property LinuxPowerManagerService::BatteryStatusPath("Battery Status Path", "Path to the battery status ( Discharging/Battery )");
+core::Property LinuxPowerManagerService::BatteryStatusDischargeKeyword("Battery Status Discharge", "Keyword to identify if battery is discharging");
+core::Property LinuxPowerManagerService::TriggerThreshold("Trigger Threshold", "Battery threshold before which we consider a slow reduction");
+core::Property LinuxPowerManagerService::LowBatteryThreshold("Low Battery Threshold", "Battery threshold before which we will aggressively reduce");
+core::Property LinuxPowerManagerService::WaitPeriod("Wait Period", "Decay between checking threshold and determining if a reduction is needed");
+
+bool LinuxPowerManagerService::isAboveMax(int new_tasks) {
+ return false;
+}
+
+uint16_t LinuxPowerManagerService::getMaxThreads() {
+ return std::numeric_limits<uint16_t>::max();
+}
+
+bool LinuxPowerManagerService::canIncrease() {
+ for (const auto path_pair : paths_) {
+ auto capacity = path_pair.first;
+ auto status = path_pair.second;
+
+ std::ifstream status_file(status);
+ std::string status_str;
+ std::getline(status_file, status_str);
+ status_file.close();
+
+ if (!utils::StringUtils::equalsIgnoreCase(status_keyword_, status_str)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void LinuxPowerManagerService::reduce() {
+ auto curr_time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+ last_time_ = curr_time;
+}
+
+/**
+ * We expect that the wait period has been
+ */
+bool LinuxPowerManagerService::shouldReduce() {
+ if (!enabled_) {
+ logger_->log_trace("LPM not enabled");
+ return false;
+ }
+
+ bool overConsume = false;
+
+ std::vector<bool> batteryAlert;
+
+ auto prev_level = battery_level_.load();
+
+ bool all_discharging = paths_.size() > 0 ? true : false;
+
+ int battery_sum = 0;
+ for (const auto path_pair : paths_) {
+ auto capacity = path_pair.first;
+ auto status = path_pair.second;
+
+ std::ifstream capacity_file(capacity);
+ std::string capacity_str;
+ std::getline(capacity_file, capacity_str);
+ capacity_file.close();
+ int battery_level = std::stoi(capacity_str);
+ battery_sum += battery_level;
+
+ std::ifstream status_file(status);
+ std::string status_str;
+ std::getline(status_file, status_str);
+ status_file.close();
+
+ if (!utils::StringUtils::equalsIgnoreCase(status_keyword_, status_str)) {
+ all_discharging &= false;
+ }
+ }
+
+ // average
+ battery_level_ = battery_sum / paths_.size();
+
+ // only reduce if we're still going down OR we've triggered the low battery threshold
+ if (battery_level_ < trigger_ && (battery_level_ < prev_level || battery_level_ < low_battery_trigger_)) {
+ if (all_discharging) {
+ // return true and wait until
+ if (last_time_ == 0) {
+ overConsume = true;
+ last_time_ = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+ } else {
+ auto curr_time = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
+ if (curr_time - last_time_ > wait_period_) {
+ overConsume = true;
+ logger_->log_trace("All banks are discharging, suggesting reduction");
+ } else {
+ logging::LOG_DEBUG(logger_) << "dischaging but can't reduce due to time " << curr_time << " " << last_time_ << " " << wait_period_;
+ }
+ }
+ }
+
+ } else {
+ logger_->log_trace("%d level is not below trigger of %d", battery_level_.load(), trigger_);
+ }
+
+ return overConsume;
+}
+
+void LinuxPowerManagerService::initialize() {
+ ThreadManagementService::initialize();
+ std::set<core::Property> supportedProperties;
+ supportedProperties.insert(BatteryCapacityPath);
+ supportedProperties.insert(BatteryStatusPath);
+ supportedProperties.insert(TriggerThreshold);
+ supportedProperties.insert(LowBatteryThreshold);
+ supportedProperties.insert(WaitPeriod);
+ setSupportedProperties(supportedProperties);
+}
+
+void LinuxPowerManagerService::yield() {
+}
+
+bool LinuxPowerManagerService::isRunning() {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+}
+
+bool LinuxPowerManagerService::isWorkAvailable() {
+ return false;
+}
+
+void LinuxPowerManagerService::onEnable() {
+ if (nullptr == configuration_) {
+ logger_->log_trace("Cannot enable Linux Power Manager");
+ return;
+ }
+ std::string trigger, wait;
+ status_keyword_ = "Discharging";
+ core::Property capacityPaths;
+ core::Property statusPaths;
+
+ if (getProperty(TriggerThreshold.getName(), trigger) && getProperty(WaitPeriod.getName(), wait)) {
+ core::TimeUnit unit;
+ int64_t wait_time;
+ if (core::Property::StringToTime(wait, wait_time, unit) && core::Property::ConvertTimeUnitToMS(wait_time, unit, wait_time)) {
+ wait_period_ = wait_time;
+ }
+
+ getProperty(BatteryStatusDischargeKeyword.getName(), status_keyword_);
+
+ trigger_ = std::stoi(trigger);
+
+ if (getProperty(LowBatteryThreshold.getName(), trigger)) {
+ low_battery_trigger_ = std::stoi(trigger);
+ } else {
+ low_battery_trigger_ = 0;
+ }
+ getProperty(BatteryCapacityPath.getName(), capacityPaths);
+ getProperty(BatteryStatusPath.getName(), statusPaths);
+ if (capacityPaths.getValues().size() == statusPaths.getValues().size()) {
+ for (size_t i = 0; i < capacityPaths.getValues().size(); i++) {
+ paths_.push_back(std::make_pair(capacityPaths.getValues().at(i), statusPaths.getValues().at(i)));
+ }
+ } else {
+ logger_->log_error("BatteryCapacityPath and BatteryStatusPath mis-configuration");
+ }
+ enabled_ = true;
+ logger_->log_trace("Enabled enable ");
+ } else {
+ logger_->log_trace("Could not enable ");
+ }
+}
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index d2fe2e5..19d815f 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -68,8 +68,6 @@ void ByteOutputCallback::write(char *data, size_t size) {
std::unique_lock<std::recursive_mutex> lock(vector_lock_);
spinner_.wait(lock, [&] {
return read_started_ || !is_alive_;});
-
- std::cout << "unlock" << std::endl;
if (!is_alive_)
return;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/254877fa/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index 3a51f14..174853e 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -52,7 +52,13 @@ find_package(OpenSSL REQUIRED)
include_directories(${OPENSSL_INCLUDE_DIR})
# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb
-target_link_libraries(minifiexe core-minifi)
+#target_link_libraries(minifiexe core-minifi)
+
+if (APPLE)
+ target_link_libraries (minifiexe -Wl,-all_load core-minifi)
+else ()
+ target_link_libraries (minifiexe -Wl,--whole-archive core-minifi -Wl,--no-whole-archive)
+endif ()
if (APPLE)
target_link_libraries (minifiexe -Wl,-all_load minifi)