You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:51:27 UTC

[04/11] nifi-minifi-cpp git commit: MINIFI-338: Convert processor threads to use thread pools

MINIFI-338: Convert processor threads to use thread pools


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/747691d7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/747691d7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/747691d7

Branch: refs/heads/master
Commit: 747691d7f3a787d7b38a4f67bac56d26001cf400
Parents: 5fca46f
Author: Marc Parisi <ph...@apache.org>
Authored: Fri Jun 30 10:05:15 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/include/EventDrivenSchedulingAgent.h |   2 +-
 libminifi/include/SchedulingAgent.h            |   2 +-
 libminifi/include/ThreadedSchedulingAgent.h    |  56 +++++-
 libminifi/include/TimerDrivenSchedulingAgent.h |   2 +-
 libminifi/include/core/Processor.h             |   5 +-
 libminifi/include/utils/ThreadPool.h           | 180 +++++++++++++++++++-
 libminifi/src/EventDrivenSchedulingAgent.cpp   |  13 +-
 libminifi/src/FlowController.cpp               |  15 +-
 libminifi/src/SchedulingAgent.cpp              |   4 +-
 libminifi/src/ThreadedSchedulingAgent.cpp      |  44 ++---
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  13 +-
 libminifi/test/unit/SocketTests.cpp            |   2 +-
 libminifi/test/unit/ThreadPoolTests.cpp        |   2 +-
 13 files changed, 281 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index c838b11..ca9f021 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   virtual ~EventDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 1ff3fac..130c088 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -84,7 +84,7 @@ class SchedulingAgent {
     running_ = true;
   }
   // stop
-  void stop() {
+  virtual void stop() {
     running_ = false;
     component_lifecycle_thread_pool_.shutdown();
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index b4db4bf..27b8b3a 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -20,6 +20,7 @@
 #ifndef __THREADED_SCHEDULING_AGENT_H__
 #define __THREADED_SCHEDULING_AGENT_H__
 
+#include <chrono>
 #include "properties/Configure.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Processor.h"
@@ -33,6 +34,47 @@ namespace nifi {
 namespace minifi {
 
 /**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : run_monitor_(run_monitor),
+        current_wait_(0) {
+
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other)
+      : AfterExecute(std::move(other)),
+        run_monitor_(std::move(other.run_monitor_)) {
+    current_wait_.store(other.current_wait_.load());
+  }
+  virtual bool isFinished(const uint64_t &result) {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const uint64_t &result) {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual int64_t wait_time() {
+    return current_wait_.load();
+  }
+ private:
+
+  std::atomic<uint64_t> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+/**
  * An abstract scheduling agent which creates and manages a pool of threads for
  * each processor scheduled.
  */
@@ -48,13 +90,18 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
                           std::shared_ptr<Configure> configuration)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
+
+    utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+    thread_pool_ = std::move(pool);
+    thread_pool_.start();
+
   }
   // Destructor
   virtual ~ThreadedSchedulingAgent() {
   }
 
   // Run function for the thread
-  virtual void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
+  virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
 
  public:
   // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
@@ -62,9 +109,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   virtual void unschedule(std::shared_ptr<core::Processor> processor);
 
+  virtual void stop();
+   protected:
+  utils::ThreadPool<uint64_t> thread_pool_;
+
  protected:
-  // Threads
-  std::map<std::string, std::vector<std::thread *>> _threads;
+
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 816bcec..1502c47 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -49,7 +49,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
   /**
    * Run function that accepts the processor, context and session factory.
    */
-  void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+  uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
 
  private:
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 251ec47..0853c11 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -220,6 +220,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
   }
 
+  // Check all incoming connections for work
+  bool isWorkAvailable();
+
  protected:
 
   // Processor state
@@ -246,8 +249,6 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
 
-  // Check all incoming connections for work
-  bool isWorkAvailable();
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Processor(const Processor &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 77772cd..8ff3975 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -33,6 +33,35 @@ namespace minifi {
 namespace utils {
 
 /**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual int64_t wait_time() {
+    return 0;
+  }
+};
+
+/**
  * Worker task
  * purpose: Provides a wrapper for the functor
  * and returns a future based on the template argument.
@@ -40,12 +69,29 @@ namespace utils {
 template<typename T>
 class Worker {
  public:
-  explicit Worker(std::function<T()> &task)
-      : task(task) {
+  explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+      : task(task),
+        run_determinant_(std::move(run_determinant)),
+        identifier_(identifier),
+        time_slice_(0) {
+    promise = std::make_shared<std::promise<T>>();
+  }
+
+  explicit Worker(std::function<T()> &task, const std::string &identifier)
+      : task(task),
+        run_determinant_(nullptr),
+        identifier_(identifier),
+        time_slice_(0) {
     promise = std::make_shared<std::promise<T>>();
   }
 
-  explicit Worker() {
+  explicit Worker(const std::string identifier = "")
+      : identifier_(identifier),
+        time_slice_(0) {
+  }
+
+  virtual ~Worker() {
+
   }
 
   /**
@@ -53,16 +99,35 @@ class Worker {
    */
   Worker(Worker &&other)
       : task(std::move(other.task)),
-        promise(other.promise) {
+        promise(other.promise),
+        time_slice_(std::move(other.time_slice_)),
+        identifier_(std::move(other.identifier_)),
+        run_determinant_(std::move(other.run_determinant_)) {
   }
 
   /**
-   * Runs the task and takes the output from the funtor
+   * Runs the task and takes the output from the functor
    * setting the result into the promise
+   * @return whether or not to continue running
+   *   false == finished || error
+   *   true == run again
    */
-  void run() {
+  virtual bool run() {
     T result = task();
-    promise->set_value(result);
+    if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) {
+      promise->set_value(result);
+      return false;
+    }
+    time_slice_ = increment_time(run_determinant_->wait_time());
+    return true;
+  }
+
+  virtual void setIdentifier(const std::string identifier) {
+    identifier_ = identifier;
+  }
+
+  virtual uint64_t getTimeSlice() {
+    return time_slice_;
   }
 
   Worker<T>(const Worker<T>&) = delete;
@@ -72,8 +137,22 @@ class Worker {
 
   std::shared_ptr<std::promise<T>> getPromise();
 
- private:
+  const std::string &getIdentifier() {
+    return identifier_;
+  }
+ protected:
+
+  inline uint64_t increment_time(const uint64_t &time) {
+    std::chrono::time_point<std::chrono::system_clock> now =
+        std::chrono::system_clock::now();
+    auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
+    return millis + time;
+  }
+
+  std::string identifier_;
+  uint64_t time_slice_;
   std::function<T()> task;
+  std::unique_ptr<AfterExecute<T>> run_determinant_;
   std::shared_ptr<std::promise<T>> promise;
 };
 
@@ -81,6 +160,9 @@ template<typename T>
 Worker<T>& Worker<T>::operator =(Worker<T> && other) {
   task = std::move(other.task);
   promise = other.promise;
+  time_slice_ = std::move(other.time_slice_);
+  identifier_ = std::move(other.identifier_);
+  run_determinant_ = std::move(other.run_determinant_);
   return *this;
 }
 
@@ -125,6 +207,21 @@ class ThreadPool {
    * @return true if future can be created and thread pool is in a running state.
    */
   bool execute(Worker<T> &&task, std::future<T> &future);
+
+  /**
+   * attempts to stop tasks with the provided identifier.
+   * @param identifier for worker tasks. Note that these tasks won't
+   * immediately stop.
+   */
+  void stopTasks(const std::string &identifier);
+
+  /**
+   * Returns true if a task is running.
+   */
+  bool isRunning(const std::string &identifier) {
+    return task_status_[identifier] == true;
+  }
+
   /**
    * Starts the Thread Pool
    */
@@ -199,6 +296,8 @@ class ThreadPool {
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
   // notification for available work
   std::condition_variable tasks_available_;
+  // map to identify if a task should be
+  std::map<std::string, bool> task_status_;
   // manager mutex
   std::recursive_mutex manager_mutex_;
   // work queue mutex
@@ -218,6 +317,10 @@ class ThreadPool {
 template<typename T>
 bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
 
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
   future = std::move(task.getPromise()->get_future());
   bool enqueued = worker_queue_.enqueue(std::move(task));
   if (running_) {
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
 template<typename T>
 void ThreadPool<T>::run_tasks() {
   auto waitperiod = std::chrono::milliseconds(1) * 100;
+  uint64_t wait_decay_ = 0;
   while (running_.load()) {
 
+    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
+    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
+    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
+    // be more likely to run. This is intentional.
+    if (wait_decay_ > 1000) {
+      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+    }
     Worker<T> task;
     if (!worker_queue_.try_dequeue(task)) {
+
       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
       tasks_available_.wait_for(lock, waitperiod);
       continue;
     }
-    task.run();
+    else {
+
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      if (!task_status_[task.getIdentifier()]) {
+        continue;
+      }
+    }
+
+    bool wait_to_run = false;
+    if (task.getTimeSlice() > 1) {
+      auto now = std::chrono::system_clock::now().time_since_epoch();
+      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+      if (task.getTimeSlice() > ms.count()) {
+        wait_to_run = true;
+      }
+    }
+    // if we have to wait we re-queue the worker.
+    if (wait_to_run) {
+      {
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (!task_status_[task.getIdentifier()]) {
+          continue;
+        }
+      }
+      worker_queue_.enqueue(std::move(task));
+
+      wait_decay_ += 100;
+      continue;
+    }
+
+    const bool task_renew = task.run();
+    wait_decay_ = 0;
+    if (task_renew) {
+
+      {
+        // even if we have more work to do we will not
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (!task_status_[task.getIdentifier()]) {
+          continue;
+        }
+      }
+      worker_queue_.enqueue(std::move(task));
+
+    }
   }
   current_workers_--;
 
@@ -272,12 +427,19 @@ void ThreadPool<T>::start() {
 }
 
 template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
 void ThreadPool<T>::shutdown() {
   if (running_.load()) {
     std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
     running_.store(false);
 
     drain();
+    task_status_.clear();
     if (manager_thread_.joinable())
       manager_thread_.join();
     {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 8a2a874..db5ca08 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -32,22 +32,27 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
 
     if (processor->isYield()) {
       // Honor the yield
-      std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+      return processor->getYieldTime();
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+      return this->bored_yield_duration_;
     }
 
     // Block until work is available
+
     processor->waitForWork(1000);
+
+    if (!processor->isWorkAvailable()) {
+      return 1000;
+    }
   }
-  return;
+  return 0;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6358ed0..32fd298 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,17 +183,16 @@ void FlowController::stop(bool force) {
   std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (running_) {
     // immediately indicate that we are not running
-    running_ = false;
-
     logger_->log_info("Stop Flow Controller");
-    this->timer_scheduler_->stop();
-    this->event_scheduler_->stop();
-    this->flow_file_repo_->stop();
-    this->provenance_repo_->stop();
-    // Wait for sometime for thread stop
-    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
     if (this->root_)
       this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+    this->flow_file_repo_->stop();
+    this->provenance_repo_->stop();
+    // stop after we've attempted to stop the processors.
+    this->timer_scheduler_->stop();
+    this->event_scheduler_->stop();
+    running_ = false;
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 1060830..e228ba5 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -46,7 +46,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
     return serviceNode->enable();
   };
   // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
   std::future<bool> future;
@@ -59,7 +59,7 @@ void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller:
     return serviceNode->disable();
   };
   // create a functor that will be submitted to the thread pool.
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
   std::future<bool> future;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 7b4ce85..d6b8fae 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -61,8 +61,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
-  if (it != _threads.end()) {
+  if (thread_pool_.isRunning(processor->getUUIDStr())) {
     logger_->log_info("Can not schedule threads for processor %s because there are existing threads running");
     return;
   }
@@ -74,20 +73,33 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
   processor->onSchedule(processContext.get(), sessionFactory.get());
 
   std::vector<std::thread *> threads;
+
+  ThreadedSchedulingAgent *agent = this;
   for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
-    ThreadedSchedulingAgent *agent = this;
-    std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () {
-      agent->run(processor, processContext.get(), sessionFactory.get());
-    });
-    thread->detach();
-    threads.push_back(thread);
-    logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), processor->getName().c_str());
+
+    // reference the disable function from serviceNode
+    std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
+      return agent->run(processor, processContext.get(), sessionFactory.get());
+    };
+    // create a functor that will be submitted to the thread pool.
+    std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
+    utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
+    // move the functor into the thread pool. While a future is returned
+    // we aren't terribly concerned with the result.
+    std::future<uint64_t> future;
+    thread_pool_.execute(std::move(functor), future);
+
   }
-  _threads[processor->getUUIDStr().c_str()] = threads;
+  logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str());
 
   return;
 }
 
+void ThreadedSchedulingAgent::stop() {
+  SchedulingAgent::stop();
+  thread_pool_.shutdown();
+}
+
 void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
   std::lock_guard < std::mutex > lock(mutex_);
   logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
@@ -97,18 +109,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
+  thread_pool_.stopTasks(processor->getUUIDStr());
 
-  if (it == _threads.end()) {
-    logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str());
-    return;
-  }
-  for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) {
-    std::thread *thread = *itThread;
-    logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), processor->getName().c_str());
-    delete thread;
-  }
-  _threads.erase(processor->getUUIDStr());
   processor->clearActiveTask();
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index b9a41ea..3276470 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,19 +29,22 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
   while (this->running_) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield()) {
       // Honor the yield
-      std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+      return processor->getYieldTime();
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+      //std::this_thread::sleep_for(std::chrono::milliseconds(x));
+      return this->bored_yield_duration_;
     }
-    std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+    return processor->getSchedulingPeriodNano() / 1000000;
+    //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
   }
-  return;
+  return 0;
+  //return;
 }
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index a791b3f..0576d5f 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -200,7 +200,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") {
   std::vector<std::future<bool>> futures;
   for (int i = 0; i < 20; i++) {
     std::function<bool()> f_ex = createSocket;
-    utils::Worker<bool> functor(f_ex);
+    utils::Worker<bool> functor(f_ex, "id");
     std::future<bool> fut;
     REQUIRE(true == pool.execute(std::move(functor), fut));
     futures.push_back(std::move(fut));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index 0bba767..670958a 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -29,7 +29,7 @@ bool function() {
 TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   utils::ThreadPool<bool> pool(5);
   std::function<bool()> f_ex = function;
-  utils::Worker<bool> functor(f_ex);
+  utils::Worker<bool> functor(f_ex, "id");
   pool.start();
   std::future<bool> fut;
   REQUIRE(true == pool.execute(std::move(functor), fut));