You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/12 15:39:45 UTC

[GitHub] [nifi-minifi-cpp] arpadboda opened a new pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

arpadboda opened a new pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380684332
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   We can erase the result type of the tasks after the we have served the consumers with their appropriately typed `std::future`. We shouldn't need to care about result type during execution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993163
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -153,24 +127,22 @@ class Worker {
   }
  protected:
 
-  inline uint64_t increment_time(const uint64_t &time) {
-    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
-    auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
-    return millis + time;
+  inline std::chrono::time_point<std::chrono::steady_clock> increment_time(const std::chrono::milliseconds &time) {
+    return std::chrono::steady_clock::now() + time;
   }
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381276745
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   In that case I think one type (something like what's currently called `ComplexResult`) can be used for all scheduling types and the template parameter is no longer needed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383975267
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
 
 Review comment:
   I strongly agree, but because of the already mentioned Win issue here this has to support copy assignment. :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993754
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -384,9 +358,11 @@ class ThreadPool {
   moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
-  std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
+  std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381917308
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   It is pretty unclear to me what protects what in `Processor`.
   The only member access in this function is for `_incomingConnections`, which is a member of `Connectable`.
   Are we protecting a member of a parent class with a mutex in a child class?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383891546
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
 
 Review comment:
   If we don't need the possibility to reassign these, I'd mark them `const`. Otherwise we may consider making them only privately mutable by making them private and providing public getters only.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381367209
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
 
 Review comment:
   I think `ComplexResult` is way too generic a name to be sitting in the `utils` namespace. When I first saw it I thought this will be some generic template contraption, not just a combination of milliseconds and a bool.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383873618
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -133,11 +107,11 @@ class Worker {
     identifier_ = identifier;
   }
 
-  virtual uint64_t getTimeSlice() {
+  virtual std::chrono::time_point<std::chrono::steady_clock> getTimeSlice() const {
     return time_slice_;
   }
 
-  virtual uint64_t getWaitTime() {
+  virtual std::chrono::milliseconds getWaitTime() {
 
 Review comment:
   Can we add `const` here as well? I appreciate every improvement in const-correctness. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993532
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+  ComplexTaskResult() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+  friend class std::_Associated_state<ComplexTaskResult>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexTaskResult> {
+ public:
+  ComplexMonitor()
+  : current_wait_(std::chrono::milliseconds(0)) {
+  }
+
+  virtual bool isFinished(const ComplexTaskResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    current_wait_.store(result.wait_time_);
+    return false;
+  }
+  virtual bool isCancelled(const ComplexTaskResult &result) override {
+    return false;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MONITORS_H
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381985684
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -170,7 +142,7 @@ template<typename T>
 class WorkerComparator {
  public:
   bool operator()(Worker<T> &a, Worker<T> &b) {
-    return a.getTimeSlice() < b.getTimeSlice();
+    return a.getTimeSlice() > b.getTimeSlice();
 
 Review comment:
   I don't mind breaking API to a certain extent to fix a critical bug as this is, but this breaks API in a dangerous way: it will compile, but do the opposite of what it did so far.
   So far this has been the comparator for the `worker_priority_queue_`, now it is used for the `delayed_worker_queue_`.
   I would prefer if this functor was renamed `DelayedWorkerComparator`, so that it would break API in a safe way and would be more straightforward.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382065632
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
 
 Review comment:
   Ah, sorry, I missed the lines. In this case you are right, merged the blocks to grab the mutex only once. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102294
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
 
 Review comment:
   Removed these, thanks. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102089
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
 
 Review comment:
   Renamed. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382062929
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   +1 to that. Definitely not in scope of this PR, but I strongly support. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381928370
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
 
 Review comment:
   This is way too generic a name as well in the raw utils namespace for something so specific. It should at least be `utils::ComplexResult::Done()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r378870246
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -44,16 +44,8 @@ uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor>
       // No work to do or need to apply back pressure
       return this->bored_yield_duration_;
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   Done, processors gets at most 500ms of continuous slice before waiting for 10ms. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] phrocker commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
phrocker commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102534
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   One of my hopes was that we could make some asks to Folly to use their thread pool(s)...I believe they've already filled those gaps and I think they have some useful containers. It would be a massive undertaking but Folly has some nice utilities that are well tested. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385330893
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +134,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
 
 Review comment:
   Normally a raw pointer is only dangerous if the lifetime or ownership is unclear or the (clear) contract is violated somewhere.
   
   Ownership:
   Since we use smart pointers all over the codebase to annotate ownership, all owning pointers have a non-raw pointer type. This makes the currently recommended practice of using raw pointers as non-owning pointers safe. We can make the contract even more clear by introducing an annotation for observer pointers, similarly to `gsl::owner`. See also https://en.cppreference.com/w/cpp/experimental/observer_ptr
   
   Lifetime:
   `ThreadPool` is owned by `FlowController` which lives longer than `SchedulingAgent`s. Check.
   
   Contract/ `delete` non-owning pointer:
   At the moment we don't have more than code reviews to check for these. In 2020, a `delete` in new code raises many eyebrows and attracts extra attention, so I think we're safe here. 
   
   Contract/Null:
   This is the most dangerous part is using a raw ptr for that member. Without gsl, we don't have a safe way to annotate the pointer to be non-null. The most we can do is add a comment next to the declaration. We're safe against null references, since the vast majority of C++ programmers would reject `*nullptr` instantly during code review, but `nullptr` is widely used and has valid use cases, so we have no protection against null pointers in the proposed case.
   Related talk: https://www.infoq.com/presentations/Null-References-The-Billion-Dollar-Mistake-Tony-Hoare/
   
   Because of the `nullptr` issue, I'm fine with keeping `thread_pool_` as a reference. The referenced point from the C++ Core Guidelines is just a note, not a properly explained guideline, so I'm even fine with ignoring it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r378946607
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -34,26 +31,21 @@ namespace minifi {
 
 uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  while (this->running_) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-
-    if (processor->isYield()) {
-      // Honor the yield
-      return processor->getYieldTime();
-    } else if (shouldYield && this->bored_yield_duration_ > 0) {
-      // No work to do or need to apply back pressure
-      return this->bored_yield_duration_;
-    }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
+  if (this->running_ && processor->isRunning()) {
+    auto start_time = std::chrono::steady_clock::now();
+    // trigger processor until it has work to do, but no more than half a sec
+    while (std::chrono::steady_clock::now() - start_time < std::chrono::milliseconds(500)) {
+      bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+      if (processor->isYield()) {
+        // Honor the yield
+        return processor->getYieldTime();
+      } else if (shouldYield) {
+        // No work to do or need to apply back pressure
+        return (this->bored_yield_duration_ > 0) ? this->bored_yield_duration_ : 10;  // No work left to do, stand by
+      }
     }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   Could you explain the reason behind the changes? It's not obvious from either the code or the PR title why this is necessary.
   
   The meaning of the return value is not obvious. Judging from the last comment, it's a duration in milliseconds. I know the lack of comment is not your fault, but I'd appreciate if you could comment its meaning somewhere near the function.
   
   There are too many hardcoded durations (10ms, 500ms). Could you explain why these were chosen?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r386572702
 
 

 ##########
 File path: libminifi/src/FlowController.cpp
 ##########
 @@ -313,21 +310,23 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
 
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
+    auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
 
 Review comment:
   :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993058
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -133,11 +107,11 @@ class Worker {
     identifier_ = identifier;
   }
 
-  virtual uint64_t getTimeSlice() {
+  virtual std::chrono::time_point<std::chrono::steady_clock> getTimeSlice() const {
 
 Review comment:
   Renamed them to reflect meaning/usage.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382657773
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -44,16 +44,8 @@ uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor>
       // No work to do or need to apply back pressure
       return this->bored_yield_duration_;
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385180490
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +138,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool_;
 
 Review comment:
   gsl is a good idea, happy to see that as a follow-up, although I think this PR is already big enough. Modified code to have clear ownership and lifetime. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385283533
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -231,7 +198,10 @@ class ThreadPool {
     thread_manager_ = nullptr;
   }
 
-  ThreadPool(const ThreadPool<T> &&other)
+  ThreadPool(const ThreadPool<T> &other) = delete;
+  ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
+
+  ThreadPool(ThreadPool<T> &&other)
       : daemon_threads_(std::move(other.daemon_threads_)),
         thread_reduction_count_(0),
         max_worker_threads_(std::move(other.max_worker_threads_)),
 
 Review comment:
   `FlowController` hands out references to its `ThreadPool`. Since `ThreadPool` is movable, it's possible that the handed out references point to a moved-from object if something/someone (probably not in the current state of the code) moves the object.
   Can we disable move? If move is needed, we may consider double indirection and updating the pointers on move. Thread safety requires more thinking in the latter case.
   
   other: Thanks for making the parameter rvalue ref non-const

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380974973
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
 
 Review comment:
   Fair point. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380684332
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   We can erase the result type of the tasks after the we have served the consumers with their appropriately typed `std::future`. We shouldn't need to care about result type during execution.
   
   note: I did not yet understand the current architecture well enough to understand all the implications and the extent of the changes required.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383973772
 
 

 ##########
 File path: libminifi/include/EventDrivenSchedulingAgent.h
 ##########
 @@ -39,21 +41,28 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
    * Create a new event driven scheduling agent.
    */
   EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
-  }
-  // Destructor
-  virtual ~EventDrivenSchedulingAgent() {
+                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                             std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
+    int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS);
+    if (slice < 10 || 1000 < slice) {
+      throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!");
 
 Review comment:
   0 doesn't seem to be valid for me as the processor could only process 1 FF before getting back to the end of the queue, that looks pretty bad for me. 
   On the other end, lasting more than 1s seems like starvation for me, I don't think that should happen. In case that happens, I would consider that as an error. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385330893
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +134,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
 
 Review comment:
   Normally a raw pointer is only dangerous if the lifetime or ownership is unclear or the (clear) contract is violated somewhere.
   
   **Ownership:**
   Since we use smart pointers all over the codebase to annotate ownership, all owning pointers have a non-raw pointer type. This makes the currently recommended practice of using raw pointers as non-owning pointers safe. We can make the contract even more clear by introducing an annotation for observer pointers, similarly to `gsl::owner`. See also https://en.cppreference.com/w/cpp/experimental/observer_ptr
   
   **Lifetime:**
   `ThreadPool` is owned by `FlowController` which lives longer than `SchedulingAgent`s. Check.
   
   **Contract/ `delete` non-owning pointer:**
   At the moment we don't have more than code reviews to check for these. In 2020, a `delete` in new code raises many eyebrows and attracts extra attention, so I think we're safe here. 
   
   **Contract/Null:**
   This is the most dangerous part is using a raw ptr for that member. Without gsl, we don't have a safe way to annotate the pointer to be non-null. The most we can do is add a comment next to the declaration. We're safe against null references, since the vast majority of C++ programmers would reject `*nullptr` instantly during code review, but `nullptr` is widely used and has valid use cases, so we have no protection against null pointers in the proposed case.
   Related talk: https://www.infoq.com/presentations/Null-References-The-Billion-Dollar-Mistake-Tony-Hoare/
   
   Because of the `nullptr` issue, I'm fine with keeping `thread_pool_` as a reference. The referenced point from the C++ Core Guidelines is just a note, not a properly explained guideline, so I'm even fine with ignoring it.
   
   edit: formatting, because of the length

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382072414
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   Agreed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383869397
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +138,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool_;
 
 Review comment:
   Since `ThreadPool` outlives `SchedulingAgent`, I'd go for an observer ptr, possibly realized as a raw pointer with not null semantics. Maybe `gsl::not_null<ThreadPool*>` if we're open to using the guideline support library.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382701048
 
 

 ##########
 File path: libminifi/include/EventDrivenSchedulingAgent.h
 ##########
 @@ -39,21 +41,28 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
    * Create a new event driven scheduling agent.
    */
   EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
-  }
-  // Destructor
-  virtual ~EventDrivenSchedulingAgent() {
+                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                             std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
+    int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS);
+    if (slice < 10 || 1000 < slice) {
+      throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!");
 
 Review comment:
   Wouldn't a warning be enough? I view as semi-valid use cases if someone wants really fine-grained round-robin scheduling in high load scenarios (slice == 0) or to flush an expensive processor before moving on (slice > 1000).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383890323
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
 
 Review comment:
   Can we consider a few more names for these functions?
   I think `Done` should become something like `OnNextEvent`, since if I understood the logic correctly, it will make the task run on the next event.
   `Retry` should be `RetryIn`, `WaitFor`, `After` or something that makes the creation of an object say what's going to happen.
   I'd consider creating an alias for `Retry(0ms)`, some naming ideas: `EndOfQueue`, `Immediately`, `AfterQueuedTasks`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383966207
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -384,9 +358,11 @@ class ThreadPool {
   moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
-  std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
+  std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
 
 Review comment:
   I'd consider marking `Worker<T>`'s move ctor and move assignment operator as `noexcept`. I have bad feelings about relying on the vector exception for move-only not-nothrow-movable types, especially since all exception guarantees are waived.
   The only non-static data member in Worker that could in theory throw on move is `std::string` and it will not throw in practice with the default allocator.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383872858
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -133,11 +107,11 @@ class Worker {
     identifier_ = identifier;
   }
 
-  virtual uint64_t getTimeSlice() {
+  virtual std::chrono::time_point<std::chrono::steady_clock> getTimeSlice() const {
 
 Review comment:
   Could you add a comment describing the new meaning of `time_slice_`/`getTimeSlice()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383975267
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
 
 Review comment:
   I strongly agree, but because of the already mentioned Win issue here this has to support copy assignment, so the members cannot be const. :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382071515
 
 

 ##########
 File path: libminifi/src/CronDrivenSchedulingAgent.cpp
 ##########
 @@ -32,7 +32,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::ComplexResult CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
     std::chrono::system_clock::time_point leap_nanos;
 
 Review comment:
   You're right, my bad.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385280579
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +134,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
 
 Review comment:
   Well, I think the raw ptr is even worse as you can delete that and while reading the code you can't assume it not being null. On the other side I hardly see value added by that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385276946
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +134,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
 
 Review comment:
   I know that I'm at fault for suggesting this, but I was dump and didn't know that "a reference member is almost always wrong". Suggestion: change to raw pointer (i.e. observer semantics, similarly to a reference) with not_null annotation (`gsl::not_null<T*>` or a comment and prayer).
   
   See last sentence of "Exceptions": http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#define-copy-move-and-destroy-consistently

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382710596
 
 

 ##########
 File path: libminifi/include/FlowController.h
 ##########
 @@ -380,6 +356,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   std::shared_ptr<core::ContentRepository> content_repo_;
 
+  // Thread pool for schedulers
+  std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool_;
 
 Review comment:
   We could make the lifetime and ownership of `thread_pool_` clearer by making `FlowController` the owner and all the `SchedulingAgent`s only have a non-owning reference (or non-null pointer). I think we can even manage without extra heap allocation by making it a data member (non-ptr) and `stop()`/`operator=`/`start()`ing it `onSchedule()`, thus making pointers stable and `shared_ptr` unnecessary over raw observer ptrs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383875859
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -153,24 +127,22 @@ class Worker {
   }
  protected:
 
-  inline uint64_t increment_time(const uint64_t &time) {
-    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
-    auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
-    return millis + time;
+  inline std::chrono::time_point<std::chrono::steady_clock> increment_time(const std::chrono::milliseconds &time) {
+    return std::chrono::steady_clock::now() + time;
   }
 
 Review comment:
   Do we really need this as a separate function? I think `std::chrono::steady_clock::now() + time` is fine written inline as well, not much longer than the function call syntax.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385278922
 
 

 ##########
 File path: libminifi/include/ThreadedSchedulingAgent.h
 ##########
 @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;
 
 Review comment:
   No offense, that's a good reason to use a different container. All I'm asking for is a code comment explaining the above for future readers.
   e.g.: `// std::set for easy erase by value`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380975656
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   The template type of threadpool is not the result of the "computation" done by a task, but a type that can be used to determine whether the current task is to be executed again or not, and in case it is, when it should happen. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382014816
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
+  return ComplexResult(true, std::chrono::milliseconds(0));
+}
+
+static ComplexResult Retry(std::chrono::milliseconds interval) {
+  return ComplexResult(false, interval);
+}
 
 Review comment:
   Why do these have static linkage in a header file? My compiler spams me with warnings for every translation unit that (potentially transitively) includes this header and doesn't use this function.
   
   If it was a static member function of `ComplexResult`, instead of a free function with static linkage, that would make the warnings go away.
   
   ```
   /home/szaszm/nifi-minifi-cpp/libminifi/include/utils/Monitors.h:162:22: warning: ‘org::apache::nifi::minifi::utils::ComplexResult org::apache::nifi::minifi::utils::Retry(std::chrono::milliseconds)’ defined but not used [-Wunused-function]                                                   
    static ComplexResult Retry(std::chrono::milliseconds interval) {                      
                         ^~~~~
   /home/szaszm/nifi-minifi-cpp/libminifi/include/utils/Monitors.h:158:22: warning: ‘org::apache::nifi::minifi::utils::ComplexResult org::apache::nifi::minifi::utils::Done()’ defined but not used [-Wunused-function]    
    static ComplexResult Done() {                            
                         ^~~~
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385354912
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -231,7 +198,10 @@ class ThreadPool {
     thread_manager_ = nullptr;
   }
 
-  ThreadPool(const ThreadPool<T> &&other)
+  ThreadPool(const ThreadPool<T> &other) = delete;
+  ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
+
+  ThreadPool(ThreadPool<T> &&other)
       : daemon_threads_(std::move(other.daemon_threads_)),
         thread_reduction_count_(0),
         max_worker_threads_(std::move(other.max_worker_threads_)),
 
 Review comment:
   Done, deleted move. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382024074
 
 

 ##########
 File path: libminifi/include/core/controller/StandardControllerServiceProvider.h
 ##########
 @@ -103,14 +103,13 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
 
   }
 
-  std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<utils::ComplexResult> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (serviceNode->canEnable()) {
       return agent_->enableControllerService(serviceNode);
     } else {
 
-      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
-        uint64_t ret = 0;
-        return ret;
+      std::future<utils::ComplexResult> no_run = std::async(std::launch::async, []() {
+        return utils::Done();
       });
       return no_run;
 
 Review comment:
   Wasn't introduced by you but I don't think that we need to execute the return of a small and simple struct on a separate thread, or even allow that. I'd change `std::launch::async` to `std::launch::deferred`.
   
   My suggestion:
   ```
   return std::async(std::launch::deferred, &utils::Done);
   ```
   
   The same applies to `disableControllerService`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385276946
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +134,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
 
 Review comment:
   I know that I'm at fault for suggesting this, but I was dumb and didn't know that "a reference member is almost always wrong". Suggestion: change to raw pointer (i.e. observer semantics, similarly to a reference) with not_null annotation (`gsl::not_null<T*>` or a comment and prayer).
   
   See last sentence of "Exceptions": http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#define-copy-move-and-destroy-consistently

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385307792
 
 

 ##########
 File path: libminifi/include/ThreadedSchedulingAgent.h
 ##########
 @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385079077
 
 

 ##########
 File path: libminifi/include/ThreadedSchedulingAgent.h
 ##########
 @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;
 
 Review comment:
   For sake of simplicity: the most straight forward container in case you have to search by value.
   Performance doesn't matter here as this code piece is only executed during flow change (startup, shutdown, update). 
   Because of the above I find this point of the guideline a bit overprotective. In case it's a core of a low latency software, yes, justify your container selection (and hopefully measure it, too!), but in our case, it's not that kind of software and definitely not a part of the code that has any effect on throughput. In this case I would suggest choosing the container that results in the most straight forward code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381406703
 
 

 ##########
 File path: libminifi/src/CronDrivenSchedulingAgent.cpp
 ##########
 @@ -32,7 +32,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::ComplexResult CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
     std::chrono::system_clock::time_point leap_nanos;
 
 Review comment:
   A find and replace of `system_clock` to `steady_clock` would make the code more stable. I see that all changed code is correct, but it would be nice to do at least the easy fixes on the old broken code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383886921
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
 
 Review comment:
   I'd rename this to something along the lines of `TaskNextSchedule`, `TaskNextRun` or similar.
   Why not the current name:
   - it's complex in the sense that it's not one/simple, but we model many complex entities yet we rarely name them as such
   - It's not the result of a task but rather a desire of a given task when to be given control next, so the result part is misleading IMO
   
   I like the design, just not the name.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r386309539
 
 

 ##########
 File path: libminifi/src/FlowController.cpp
 ##########
 @@ -313,21 +310,23 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
 
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
+    auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
 
 Review comment:
   Ha, I thought so, but not in this case.
   This class has multiple bases and the one needed here is not the one that inherits from ```std::enable_shared_from_this```, so the cast is required. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382010334
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   Accessing incoming connections
   Not sure it's needed, but won't hurt, it's done the same way in "flowFilesQueued()" and "flowFileOutGoingFull()" above in this file, also the functions adding/removing connections lock this mutex as well. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383861195
 
 

 ##########
 File path: libminifi/include/CronDrivenSchedulingAgent.h
 ##########
 @@ -41,16 +41,18 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
    * Create a new event driven scheduling agent.
    */
   CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                            std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
   }
   // Destructor
   virtual ~CronDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+  utils::ComplexTaskResult run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  virtual void stop() {
+  virtual void stop() override {
 
 Review comment:
   Virtual functions should specify exactly one of virtual, override, or final.
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rh-override

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380974973
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
 
 Review comment:
   Fair point. 
   
   Although the intent is to release to mutex while the (possibly) long lasting execution (the task itself) is done. After that's done, we might grab it again in case it should be put to the delayed queue. Which is the scenario with the highest possibility, although I hope that most of the processor logics are complex enough to make it worth releasing and acquiring a lock again. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381370363
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
 
 Review comment:
   This and all the other equivalent classes give the following warning on macOS, perfectly validly, because `std::atomic` is not copyable or movable.
   
   ```
   /Users/danielbakai/nifi-minifi-cpp3/libminifi/include/utils/Monitors.h:66:12: warning: explicitly defaulted move
         constructor is implicitly deleted [-Wdefaulted-function-deleted]
     explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
              ^
   /Users/danielbakai/nifi-minifi-cpp3/libminifi/include/utils/Monitors.h:89:42: note: move constructor of 'TimerAwareMonitor'
         is implicitly deleted because field 'current_wait_' has a deleted move constructor
     std::atomic<std::chrono::milliseconds> current_wait_;
                                            ^
   /Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/atomic:1085:7: note: copy constructor of
         'atomic<std::__1::chrono::duration<long long, std::__1::ratio<1, 1000> > >' is implicitly deleted because base class
         '__atomic_base<std::__1::chrono::duration<long long, std::__1::ratio<1, 1000> > >' has a deleted copy constructor
       : public __atomic_base<_Tp>
         ^
   /Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/atomic:984:5: note: '__atomic_base' has been explicitly
         marked deleted here
       __atomic_base(const __atomic_base&) = delete;
       ^
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382064259
 
 

 ##########
 File path: libminifi/src/CronDrivenSchedulingAgent.cpp
 ##########
 @@ -32,7 +32,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::ComplexResult CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
     std::chrono::system_clock::time_point leap_nanos;
 
 Review comment:
   I mostly agree, but this is a *cron* based scheduler, which is the very rare case that makes usages of system_clock valid. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383893631
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+  ComplexTaskResult() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+  friend class std::_Associated_state<ComplexTaskResult>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexTaskResult> {
+ public:
+  ComplexMonitor()
+  : current_wait_(std::chrono::milliseconds(0)) {
+  }
+
+  virtual bool isFinished(const ComplexTaskResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    current_wait_.store(result.wait_time_);
+    return false;
+  }
+  virtual bool isCancelled(const ComplexTaskResult &result) override {
+    return false;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
 
 Review comment:
   I have a preference for default member initializers where there is a good default initialization for a given member and it's an error to not call an initializer on them.
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c48-prefer-in-class-initializers-to-member-initializers-in-constructors-for-constant-initializers

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993979
 
 

 ##########
 File path: libminifi/include/CronDrivenSchedulingAgent.h
 ##########
 @@ -41,16 +41,18 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
    * Create a new event driven scheduling agent.
    */
   CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                            std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
   }
   // Destructor
   virtual ~CronDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+  utils::ComplexTaskResult run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  virtual void stop() {
+  virtual void stop() override {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102803
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -32,28 +29,24 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::ComplexResult EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  while (this->running_) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-
-    if (processor->isYield()) {
-      // Honor the yield
-      return processor->getYieldTime();
-    } else if (shouldYield && this->bored_yield_duration_ > 0) {
-      // No work to do or need to apply back pressure
-      return this->bored_yield_duration_;
-    }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
+  if (this->running_ && processor->isRunning()) {
+    auto start_time = std::chrono::steady_clock::now();
+    // trigger processor until it has work to do, but no more than half a sec
+    while (std::chrono::steady_clock::now() - start_time < std::chrono::milliseconds(500)) {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380645372
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -44,16 +44,8 @@ uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor>
       // No work to do or need to apply back pressure
       return this->bored_yield_duration_;
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   Yep, will add to iter 2 when unifying thread pool. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381283207
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   Completely agree, that's my long term plan. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102891
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -170,7 +142,7 @@ template<typename T>
 class WorkerComparator {
  public:
   bool operator()(Worker<T> &a, Worker<T> &b) {
-    return a.getTimeSlice() < b.getTimeSlice();
+    return a.getTimeSlice() > b.getTimeSlice();
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] phrocker commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
phrocker commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r379849295
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -44,16 +44,8 @@ uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor>
       // No work to do or need to apply back pressure
       return this->bored_yield_duration_;
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   Could that number be configurable? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385180341
 
 

 ##########
 File path: libminifi/include/SchedulingAgent.h
 ##########
 @@ -202,7 +138,7 @@ class SchedulingAgent {
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool_;
 
 Review comment:
   gsl is a good idea, happy to see that as a follow-up, although I think this PR is already big enough. Modified code to have clear ownership and lifetime. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993110
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -133,11 +107,11 @@ class Worker {
     identifier_ = identifier;
   }
 
-  virtual uint64_t getTimeSlice() {
+  virtual std::chrono::time_point<std::chrono::steady_clock> getTimeSlice() const {
     return time_slice_;
   }
 
-  virtual uint64_t getWaitTime() {
+  virtual std::chrono::milliseconds getWaitTime() {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383903437
 
 

 ##########
 File path: libminifi/include/ThreadedSchedulingAgent.h
 ##########
 @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;
 
 Review comment:
   Why `std::set` instead of `std::vector`? Could you add explanation as a code comment?
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#slcon2-prefer-using-stl-vector-by-default-unless-you-have-a-reason-to-use-a-different-container

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on issue #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on issue #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#issuecomment-596562157
 
 
   Created Jira for the unstable http tests:
   https://issues.apache.org/jira/browse/MINIFICPP-1172

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r383892746
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+  ComplexTaskResult() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+  friend class std::_Associated_state<ComplexTaskResult>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexTaskResult> {
+ public:
+  ComplexMonitor()
+  : current_wait_(std::chrono::milliseconds(0)) {
+  }
+
+  virtual bool isFinished(const ComplexTaskResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    current_wait_.store(result.wait_time_);
+    return false;
+  }
+  virtual bool isCancelled(const ComplexTaskResult &result) override {
+    return false;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MONITORS_H
 
 Review comment:
   No LF at the end of the file

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382103331
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
+  return ComplexResult(true, std::chrono::milliseconds(0));
+}
+
+static ComplexResult Retry(std::chrono::milliseconds interval) {
+  return ComplexResult(false, interval);
+}
 
 Review comment:
   Moved them to the class as static members, no more warnings. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382153078
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   > My guess would be that we need to use `Connectable::work_available_mutex_` instead of `Processor::mutex_`.
   > 
   > I find it a very bad design decision to expose threading and signaling primitives in `Connectable`. `isWorkAvailable()` should be a source object (of "work") that can be checked for readiness and waited for. That would break an important API, though, so just leaving it here.
   
   In theory you are most probably right, but if you look at that file, all the other functions that touch incoming and outgoing connections lock this mutex. 
   I'm fine with a follow-up ticket to clean this up, but this PR wasn't meant to do that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380647644
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if(delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for(const auto &thread : thread_queue_){
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::ComplexResult>;
 
 Review comment:
   This is ugly but meant to be temporary. 
   
   My long term goal would be to remove template type from ThreadPool. 
   ComplexResult can be extended in case further decision factors pop up in the future. 
   By removing the template parameter we could have one unified threadpool in MiNiFi that handles all kinds of tasks. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381970926
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -32,28 +29,24 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::ComplexResult EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  while (this->running_) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-
-    if (processor->isYield()) {
-      // Honor the yield
-      return processor->getYieldTime();
-    } else if (shouldYield && this->bored_yield_duration_ > 0) {
-      // No work to do or need to apply back pressure
-      return this->bored_yield_duration_;
-    }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
+  if (this->running_ && processor->isRunning()) {
+    auto start_time = std::chrono::steady_clock::now();
+    // trigger processor until it has work to do, but no more than half a sec
+    while (std::chrono::steady_clock::now() - start_time < std::chrono::milliseconds(500)) {
 
 Review comment:
   I think we might have to check `processor->isRunning()` in this loop as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102603
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
 
 Review comment:
   Moved them to the class as static member functions, so they belong to that namespace now. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993435
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
 
 Review comment:
   Done means done, it's not getting executed again.
   Renamed Retry to RetryIn and added RetryImmediately. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381414232
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
 
 Review comment:
   I see no waiting for the task completion between the protected blocks. (or critical sections as the windows folks like to call them)
   Shouldn't there be a cv.wait()?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382102699
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
+  return ComplexResult(true, std::chrono::milliseconds(0));
+}
+
+static ComplexResult Retry(std::chrono::milliseconds interval) {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r380656871
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      if (task_status_[task.getIdentifier()] && task.run()) {
+        if(task.getTimeSlice() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() || task.getTimeSlice() < delayed_worker_queue_.top().getTimeSlice();
+
+        delayed_worker_queue_.push(std::move(task));
+        if(need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while(running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while(!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getTimeSlice() < std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if(delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - delayed_worker_queue_.top().getTimeSlice());
+      delayed_task_available_.wait_for(lock, wait_time);
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+// likely don't have a thread manager
+  if (LIKELY(nullptr != thread_manager_)) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+      }
+      {
+        std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
 
 Review comment:
   Why do we release and grab the same mutex again? It's much less overhead to just keep the mutex locked, except if there's waiting involved.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r384993592
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexTaskResult {
+  ComplexTaskResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static ComplexTaskResult Done() {
+    return ComplexTaskResult(true, std::chrono::milliseconds(0));
+  }
+
+  static ComplexTaskResult Retry(std::chrono::milliseconds interval) {
+    return ComplexTaskResult(false, interval);
+  }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+  ComplexTaskResult() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+  friend class std::_Associated_state<ComplexTaskResult>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexTaskResult> {
+ public:
+  ComplexMonitor()
+  : current_wait_(std::chrono::milliseconds(0)) {
+  }
+
+  virtual bool isFinished(const ComplexTaskResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    current_wait_.store(result.wait_time_);
+    return false;
+  }
+  virtual bool isCancelled(const ComplexTaskResult &result) override {
+    return false;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385284429
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -268,8 +238,16 @@ class ThreadPool {
   /**
    * Returns true if a task is running.
    */
-  bool isRunning(const std::string &identifier) {
-    return task_status_[identifier] == true;
+  bool isTaskRunning(const std::string &identifier) const {
+    try {
+      return task_status_.at(identifier) == true;
+    } catch (const std::out_of_range &e) {
+      return false;
+    }
+  }
 
 Review comment:
   I love the `const`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r378858323
 
 

 ##########
 File path: libminifi/src/EventDrivenSchedulingAgent.cpp
 ##########
 @@ -44,16 +44,8 @@ uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor>
       // No work to do or need to apply back pressure
       return this->bored_yield_duration_;
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
   }
-  return 0;
+  return 10;  // Let's check back for work in 10ms or when a thread is available to execute
 
 Review comment:
   This limits the event driven processors to 100 triggers/sec, and in case they don't/can't use batching to 100 FF/s, even when we know they would have work to do. This would broke many flows out in the wild. We should find a way to solve the starvation issue without introducing a bottleneck.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385290235
 
 

 ##########
 File path: libminifi/src/FlowController.cpp
 ##########
 @@ -313,21 +310,23 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
 
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
+    auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
 
 Review comment:
   No conversion is necessary here since conversion to base class ptr is implicit. If we want to make it explicit, then `static_pointer_cast` is sufficient.
   
   Suggestion: inline `shared_from_this()` in place of every usage of `base_shared_ptr`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385278922
 
 

 ##########
 File path: libminifi/include/ThreadedSchedulingAgent.h
 ##########
 @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;
 
 Review comment:
   No offense, that's a good reason to use a different container. All I'm asking for is a code comment explaining the above for future readers.
   e.g.: `// std::set for easy lookup/erase`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r385180919
 
 

 ##########
 File path: libminifi/include/FlowController.h
 ##########
 @@ -380,6 +356,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
 
   std::shared_ptr<core::ContentRepository> content_repo_;
 
+  // Thread pool for schedulers
+  std::shared_ptr<utils::ThreadPool<utils::ComplexTaskResult>> thread_pool_;
 
 Review comment:
   Done, it's now a member of FlowController and the schedulers only get a reference to that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382080471
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   My guess would be that we need to use `Connectable::work_available_mutex_` instead of `Processor::mutex_`.
   
   I find it a very bad design decision to expose threading and signaling primitives in `Connectable`. `isWorkAvailable()` should be a source object (of "work") that can be checked for readiness and waited for. That would break an important API, though, so just leaving it here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382014703
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   We should definitely think about introducing generic thread-safe proxy containers. (Whereby you can only access the encapsulated object through a lock handle.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382010334
 
 

 ##########
 File path: libminifi/src/core/Processor.cpp
 ##########
 @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
 
 Review comment:
   Accessing incoming connections
   Not sure it's needed, but won't hurt, it's done the same way in "flowFilesQueued()" and "flowFileOutGoingFull()" above in this file, also the functions responsible for adding/removing connections lock this mutex as well. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r381949345
 
 

 ##########
 File path: libminifi/include/utils/Monitors.h
 ##########
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  explicit TimerAwareMonitor(TimerAwareMonitor &&other) = default;
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+  explicit SingleRunMonitor(SingleRunMonitor &&other) = default;
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct ComplexResult {
+  ComplexResult(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+};
+
+class ComplexMonitor : public utils::AfterExecute<ComplexResult> {
+ public:
+  ComplexMonitor(std::atomic<bool> *run_monitor)
+  : current_wait_(std::chrono::milliseconds(0)),
+    run_monitor_(run_monitor) {
+  }
+  explicit ComplexMonitor(ComplexMonitor &&other) = default;
+
+  virtual bool isFinished(const ComplexResult &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    if (*run_monitor_) {
+      current_wait_.store(result.wait_time_);
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const ComplexResult &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+static ComplexResult Done() {
+  return ComplexResult(true, std::chrono::milliseconds(0));
+}
+
+static ComplexResult Retry(std::chrono::milliseconds interval) {
 
 Review comment:
   The same as for `Done`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #735: WIP: MINIFICPP-1158 - Event driven processors can starve each other
URL: https://github.com/apache/nifi-minifi-cpp/pull/735#discussion_r382657941
 
 

 ##########
 File path: libminifi/include/core/controller/StandardControllerServiceProvider.h
 ##########
 @@ -103,14 +103,13 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
 
   }
 
-  std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<utils::ComplexResult> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (serviceNode->canEnable()) {
       return agent_->enableControllerService(serviceNode);
     } else {
 
-      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
-        uint64_t ret = 0;
-        return ret;
+      std::future<utils::ComplexResult> no_run = std::async(std::launch::async, []() {
+        return utils::Done();
       });
       return no_run;
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services