You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/03/23 14:15:03 UTC

[GitHub] [nifi-minifi-cpp] arpadboda opened a new pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

arpadboda opened a new pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396716663
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -64,8 +64,10 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(0));
 
 Review comment:
   Meant to sleep for 1 msec, corrected that. 
   
   In case this somehow occurs in a transient state (during startup or shutdown), this prevent the worker threads to busy wait (dequeue fails immediately and the workers would retry without sleep).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043152
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   It's fine and expected that we use it in libminifi. What I'd like is that we only support dependency on public members for libminifi API users and treat `protected`/`private` as "internal". The language doesn't provide tools for that (like package-private in Java), so that's an API documentation issue. We could also use pimpl, but that would mean a lot of boilerplate, an extra level of indirection and no inlining, so I'm against that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644084
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+     while (results.size() < 3) {
+       std::string s;
+       if (queue.tryDequeue(s)) {
+         results.push_back(s);
+       } else {
+         std::this_thread::sleep_for(std::chrono::milliseconds(1));
+       }
+     }
+    });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeue(s)) {
+      results.push_back(s);
+    }
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to consume again
+ * Even in this case the ones inserted later by the producer  should be consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::set<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+    while (results.size() < 3) {
+      std::string s;
+      if (queue.tryDequeue(s)) {
+        results.insert(s);
+        queue.enqueue(std::move(s));
+      } else {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
+    }
+  });
+
+  producer.join();
+
+  // Give some time for the consumer to loop over the queue
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+  consumer.join();
 
 Review comment:
   The wait is redundant since `consumer.join()` will wait until the consumer finishes execution which is until it reaches the first "tss".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396518120
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+  ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  ConditionConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Is this mutex still used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396535599
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   What is the purpose of this sleep?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   Unfortunately not. 
   ```wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, optionally looping until some predicate is satisfied.```
   I think this means the predicate can only be used to wait *again* for further notification in case it's not fulfilled (in case of spurious wakeup for eg), but cannot avoid waiting. 
   
   I gave it a try, but removing the if significantly changed the behaviour, calls block even if there are tasks in the queue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397293761
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+  ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  ConditionConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Thanks and nevermind, I mixed the queues.
   
   But there is something that can still be fixed here: whitespaces at the end of line 338

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396729961
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   As stated in https://en.cppreference.com/w/cpp/thread/condition_variable/wait the wait with predicate is equivalent to
   ```
   while (!pred()) {
       wait(lock);
   }
   ```
   
   And it is really implemented that way: https://github.com/gcc-mirror/gcc/blob/b7c9bd36eaacac42631b882dc67a6f0db94de21c/libstdc%2B%2B-v3/include/std/condition_variable#L98
   
   If removing the `if` causes an issue, then there is a real issue in the code. I will review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399642420
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
 
 Review comment:
   It would be nice to have comments explaining the purpose of each class and the guarantees. This way the user doesn't have to read the code to use the classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396538818
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   In my understanding, it is to wait for all worker thread functions to return. They are either in a task or waiting for one. If they are waiting for one, worker_queue_.stop() will make the tryDeqeue return with false and they will end. If they are in a task, they will finish executing it, call tryDeqeue, which will return false, and they will end.
   
   My initial reaction at first was "why are we not using the ConcurrentQueue's cv instead of "polling" but I realized that is more complex than that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399652688
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+     while (results.size() < 3) {
+       std::string s;
+       if (queue.tryDequeue(s)) {
+         results.push_back(s);
+       } else {
+         std::this_thread::sleep_for(std::chrono::milliseconds(1));
+       }
+     }
+    });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeue(s)) {
+      results.push_back(s);
+    }
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to consume again
+ * Even in this case the ones inserted later by the producer  should be consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::set<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
 
 Review comment:
   there seems to be one extra level of indentation

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640042
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   Can we mark this API private to extension developers? It would be nice to have the freedom to change the underlying container later, e.g. to a lock-free queue that keeps order, without breaking API.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396538818
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   In my understanding, it is to wait for all worker thread functions to return. They are either in a task or waiting for one. If they are waiting for one, worker_queue_.stop() will make the tryDeqeue return with false and they will end. If they are in a task, they will finish executing it, call tryDeqeue, which will return false, and they will end.
   
   My initial reaction at first was "why are we not using the ConcurrentQueue's cv instead of polling" but I realized that it is more complex than that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396494930
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   This `if` is unnecessary, `cv.wait` will first check the predicate, and if it is true, it will return immediately without waiting.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396725596
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
 
 Review comment:
   When I was reading the private inheritance idea, my first though was "I love it". 
   
   Although I realised that private inheritance would made me rewrap or copy/paste all the functions (size(), clear()) I would like to inherit from the base class without any modification on them. 
   
   In case it can somehow be avoided (I found no way, but naturally open to suggestions), I absolutely favour the private inheritance. Otherwise - to avoid code duplication or needless wrappers - I would stay with the current implementation. 
   
   Virtual functions are removed in the meanwhile to address that part of your concerns. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   Unfortunately not. 
   ```wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, optionally looping until some predicate is satisfied.```
   I think this means the predicate can only be used to wait *again* for further notification in case it's not fulfilled (in case of spurious wakeup for eg), but cannot avoid waiting. 
   
   I gave it a try, but removing the if significantly changed the behaviour, calls block even if there is are tasks in the queue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540634
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
 
 Review comment:
   Changed to private inheritance

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400061549
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -64,8 +64,10 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
 
 Review comment:
   thx for the explanation. Feel free to close the thread.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   @bakaid is right. 
   Stopping the queue wakes up all the worker threads actually doing nothing (waiting for work), but there can be worker threads doing some useful work (ontrigger calls, c2 heartbeats, whatever). These should end in a timely manner and stopping the queue guarantees that they don't pick up new tasks, but I found no better option to wait for that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396522340
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
 
 Review comment:
   If we are in an unknown state, crash and restart by the service manager is better than throwing and/or logging something and then continuing somewhere somehow, IMHO.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039479
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -64,8 +64,10 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
 
 Review comment:
   There is no logger here. 
   It's not a critical error that worth an exception/dump and it might happen during startup/shutdown for a very short time. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038743
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
 
 Review comment:
   Added comments to explain

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   I removed first and tests started to fail, so I reverted it. 
   
   Now I added some debug prints there to see what's going on, removed the ifs and everything worked. 
   It seems to work now properly, not sure what happened with my build. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639717
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
 
 Review comment:
   - This constructor should be `explicit`
   - redundant semicolon at the end of the line
   - inconsistency between initializers (direct-initialization vs direct-list-initialization), but this is not a big deal
   - I think `start = true` is a better default, because RAII and first start is initialization IMO, because the class is only useful when started.  If the thread pool class uses late initialization (start), it should override the default.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396729961
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   As stated in https://en.cppreference.com/w/cpp/thread/condition_variable/wait the wait with predicate it is equivalent to
   ```
   while (!pred()) {
       wait(lock);
   }
   ```
   
   And it is really implemented that way: https://github.com/gcc-mirror/gcc/blob/b7c9bd36eaacac42631b882dc67a6f0db94de21c/libstdc%2B%2B-v3/include/std/condition_variable#L98
   
   If removing the `if` causes an issue, then there is a real issue in the code. I will review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400041324
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   In case  there would be no sleeps here, producer most probably just inserts everything before the consumer is even started. 
   Which would also make the last testcases useless, where the goal is to prove that elements inserted later (when some are already looped in the queue) also get to a consumer. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037700
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
 
 Review comment:
   Fixed them

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396538818
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   In my understanding, it is to wait for all worker thread functions to return. They are either in a task or waiting for one. If they are waiting for one, worker_queue_.stop() will make the tryDeqeue return with false and they will end. If they are in a task, they will finish executing it, call tryDeqeue, which will return false, and they will end.
   
   My initial reaction at first was "why are we not using the ConcurrentQueue's cv instead of "<<polling>>" but I realized that is more complex than that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400051711
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -119,10 +133,22 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
     }
   }
   
-  bool dequeue(T& out) {
+  bool dequeueWait(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped 
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
 
 Review comment:
   I like that you used a template here, not just a concrete duration type like `milliseconds`, which is more common in the code base.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   I removed first and tests started to fail, so I reverted it. 
   
   Now I added some debug prints there to see what's going on, removed the if and everything worked. 
   It seems to work now properly, not sure what happened with my build. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396495351
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
+      cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // Only wake up if there is something to return or stopped 
+    }
+    return running_ && ConcurrentQueue<T>::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    if (!running_) {
+      running_ = true;
+      if (!this->queue_.empty()) {
+	cv_.notify_all();
 
 Review comment:
   Indentation is bad.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400204455
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   Done, added a new testcases that deals with a lot of elements without sleeps. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037378
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+  ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  ConditionConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400040217
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+     while (results.size() < 3) {
+       std::string s;
+       if (queue.tryDequeue(s)) {
+         results.push_back(s);
+       } else {
+         std::this_thread::sleep_for(std::chrono::milliseconds(1));
+       }
+     }
+    });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeue(s)) {
+      results.push_back(s);
+    }
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to consume again
+ * Even in this case the ones inserted later by the producer  should be consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::set<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203410
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
+    }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
+    checkLock(lck);
+    return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque<T> queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400060410
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   I see your point, but we should cover data races as well. Can we have at least a handful of test cases with rapid insertions, i.e. with no sleep, to cover both problems?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on issue #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on issue #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#issuecomment-603238186
 
 
   > Compiled and tested the branch on macOS.
   > Also rebased MINIFICPP-550 to this branch and tried whether it fixes the C2 task starvation problem that made me find this issue - it does.
   > @arpadboda What I would really like to see are at least some minimal positive tests for the new `ConcurrentQueue` and `ConditionConcurrentQueue`. We are replacing a known (bad for our purpose, but still tested) implementation with an untested one. There are some minimal ThreadPool tests and indirectly some tests depend on this, but that does not make finding errors easy, so I would really prefer some unit tests here.
   
   Absuletly agree, already working on some. 
   Wanted to make sure it works well on all the platforms first (and CI passes). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043883
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
 template <typename T>
 class ConcurrentQueue {
  public:    
-  ConcurrentQueue() = default;
-  virtual ~ConcurrentQueue() = default;
+  explicit ConcurrentQueue() = default;
 
 Review comment:
   I don't see the need to make this explicit, although it doesn't hurt much. It prevents code like this:
   `ConcurrentQueue make_queue() { return {}; }`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396501595
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -64,8 +64,10 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(0));
 
 Review comment:
   If this shouldn't happen, how sleeping for 0 milliseconds will help?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396713099
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
+      cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // Only wake up if there is something to return or stopped 
+    }
+    return running_ && ConcurrentQueue<T>::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
 
 Review comment:
   You can construct in started state.
   
   Running means consumers are allowed to wait for notifications (blocking calls). 
   Stopping prevents it and wakes them up to make sure no threads hang when no further data is expected to arrive. 
   
   Added a comment to the bool member to explain the purpose. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203526
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   Did the 2nd

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400205341
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   I think that topic is way more general than this PR.
   
   I would be happy to a see a doc about the interfaces we consider public API, but that doesn't exist yet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038135
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   The container is now private, the mutex isn't as you have to acquire that in derived classes as well. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399648668
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
 
 Review comment:
   Just an interesting note: the compiler will optimize out the extra boolean of `std::unique_lock` (vs. `std::lock_guard`) if it's not used. The point is that the locked version's signature is fine, there's absolutely no reason to optimize.
   
   Proof: https://godbolt.org/z/ZgIf0f
   I'm only surprised that the compiler emits the same assembly twice instead of labeling the same code twice.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400057782
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
+    }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
+    checkLock(lck);
+    return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque<T> queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {}
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+
+  using ConcurrentQueue<T>::size;
+  using ConcurrentQueue<T>::empty;
+  using ConcurrentQueue<T>::clear;
+
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool dequeueWait(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped 
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Wake up with timeout or in case there is something to do
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (!running_) {
+      running_ = true;
+      if (!this->emptyImpl(lck)) {
+        cv_.notify_all();
+      }
 
 Review comment:
   Since we are transitioning from stopped to started in a critical section, and it's not possible to have consumers in a stopped state, this notification shouldn't be necessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399643197
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   I suggest avoiding all sleeps in the producers to speed up the test cases and to have the chance to discover race conditions that would only result in data races with tighter timings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203336
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
+    }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
+    checkLock(lck);
+    return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque<T> queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {}
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+
+  using ConcurrentQueue<T>::size;
+  using ConcurrentQueue<T>::empty;
+  using ConcurrentQueue<T>::clear;
+
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool dequeueWait(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped 
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Wake up with timeout or in case there is something to do
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (!running_) {
+      running_ = true;
+      if (!this->emptyImpl(lck)) {
+        cv_.notify_all();
+      }
 
 Review comment:
   Removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639532
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
 
 Review comment:
   The virtual destructor is no longer needed with private inheritance.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038906
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+     while (results.size() < 3) {
+       std::string s;
+       if (queue.tryDequeue(s)) {
+         results.push_back(s);
+       } else {
+         std::this_thread::sleep_for(std::chrono::milliseconds(1));
+       }
+     }
+    });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeue(s)) {
+      results.push_back(s);
+    }
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to consume again
+ * Even in this case the ones inserted later by the producer  should be consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::set<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+    while (results.size() < 3) {
+      std::string s;
+      if (queue.tryDequeue(s)) {
+        results.insert(s);
+        queue.enqueue(std::move(s));
+      } else {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
+    }
+  });
+
+  producer.join();
+
+  // Give some time for the consumer to loop over the queue
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+  consumer.join();
 
 Review comment:
   Good point, removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396729961
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
 
 Review comment:
   As stated in https://en.cppreference.com/w/cpp/thread/condition_variable/wait the wait with predicate it is equivalent to
   ```
   while (!pred()) {
       wait(lock);
   }
   ```
   
   And it is really implemented that way: https://github.com/gcc-mirror/gcc/blob/b7c9bd36eaacac42631b882dc67a6f0db94de21c/libstdc%2B%2B-v3/include/std/condition_variable#L98
   
   If removing the `if` causes an issue, then there is a real issue in the code. I will review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   Did the 2nd
   
   Btw I don't think using directives are any bad in test cpp files.
   In headers that get included to production code I would definitely avoid them as well. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396538818
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   In my understanding, it is to wait for all worker thread functions to return. They are either in a task or waiting for one. If they are waiting for one, worker_queue_.stop() will make the tryDeqeue return with false and they will end. If they are in a task, they will finish executing it, call tryDeqeue, which will return false, and they will end.
   
   My initial reaction at first was "why are we not using the ConcurrentQueue's cv instead of polling" but I realized that is more complex than that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396515674
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
 
 Review comment:
   I think it's not a good design to allow dynamic polymorphism between these containers, despite the "is a" relationship being present. My intuition screams design issue but can't fully grasp what is the root cause. I'll try to make some points below.
   
   Performance: I prefer not to have virtual functions on containers, as most or all users will know their requirements against their container. This design violates the zero overhead principle by imposing virtual calls on users that don't need notification capabilities.
   
   Hierarchy: I think the relationship here is an added [aspect](https://en.wikipedia.org/wiki/Aspect-oriented_programming) rather than a hierarchy.
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c120-use-class-hierarchies-to-represent-concepts-with-inherent-hierarchical-structure-only
   
   Inheritance: 
   - The inheritance here is both interface and implementation inheritance.
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c129-when-designing-a-class-hierarchy-distinguish-between-implementation-inheritance-and-interface-inheritance
   - Leaving the class open for extension makes it possible for subclasses to violate the invariants of the base class, violating encapsulation.
   
   Do we really need runtime polymorphism? If yes, I'd make it possible through concept-based polymorphism (via type erasure) without affecting the implementation.
   
   In either case, I'd make ConcurrentQueue closed and ConditionConcurrentQueue a wrapper around ConcurrentQueue ("implemented in terms of") rather than a public subclass ("is a"). To access the mutex, I recommend this to be a private inheritance with only the mutex of the base class marked as protected, or some other way of leaking the mutex.
   
   I feel like my above arguments are weak, and my proposed design is not very sound. As always, I welcome discussion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   @bakaid is right. 
   Stopping the queue wakes up all the worker threads actually doing nothing (waiting for work), but there can be worker threads doing some useful work (ontrigger calls, c2 heartbeats, whatever). These should end in a timely manner, but I found no better option to wait for that. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711286
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
 
 Review comment:
   Agreed, logic error is thrown here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396497543
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
 
 Review comment:
   This can only happen if we messed up our code, or someone wrote a derived class and messed up the code.
   This is not a normal "we do not have elements" condition, this is an assertion error. I would be more happy to see an exception here to make sure it does not go unnoticed if it occurs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on issue #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on issue #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#issuecomment-605317675
 
 
   Added tests

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396521172
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
+      cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // Only wake up if there is something to return or stopped 
+    }
+    return running_ && ConcurrentQueue<T>::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
 
 Review comment:
   What does `start` mean? Why do I have to start my queue after creating it? Same for `isRunning`. If it serves some purpose, we need a comment explaining it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644887
 
 

 ##########
 File path: libminifi/src/utils/ThreadPool.cpp
 ##########
 @@ -64,8 +64,10 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
 
 Review comment:
   If it shouldn't happen and we're handling it, then we should log the event and consider throw/abort and generating a core dump for debugging.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037604
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
 
 Review comment:
   Removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396491413
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
 
 Review comment:
   `try_deque` in `moodycamel` meant that it won't block if there are no elements available, it will return false.
   `tryDequeue` in `ConcurrentQueue` means that it won't block if there are no elements available, it will return false.
   Yet, in `ConditionConcurrentQueue` `tryDequeue` means that it will block if there are no elements available. This is pretty confusing.
   Also, I don't see much value of being able to access the `ConditionConcurrentQueue` through the `ConcurrentQueue` interface.
   This function should be `dequeue`, and either it should not be the descendant of `ConcurrentQueue`, or it is also perfectly okay to have a `tryDequeue` as well, which is not overridden.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540702
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400048134
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -81,25 +82,38 @@ class ConcurrentQueue {
     : queue_( std::move(other.queue_) ) {}
 
  protected:
-  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
     if (!lck.owns_lock()) {
-      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
     }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
     if (queue_.empty()) {
       return false;
     }
     out = std::move(queue_.front());
     queue_.pop_front();
     return true;
   }
 
 Review comment:
   I like that you used protected functions taking `std::unique_lock` to make it an error to use the non-locking functions without a lock.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038581
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+
+  using ConcurrentQueue<T>::size;
+  using ConcurrentQueue<T>::empty;
+  using ConcurrentQueue<T>::clear;
+
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool dequeue(T& out) {
 
 Review comment:
   Added wait_for, so now it support:
   
   - tryDequeue
   - DequeueWait
   - DequeueWaitFor

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400045447
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
 
 Review comment:
   I would add:
   - Guarantees that the elements are dequeued in the order of insertion

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396710845
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
 
 Review comment:
   Renamed to "dequeue", tryDequeue is also available as a non-blocking alternative. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397167251
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
 
 Review comment:
   In the case of any "implemented in terms of" relationship, the new class would need to provide all of the required functionality and delegate to the inner class. This means boilerplate, as you pointed out, regardless of whether it's private inheritance or composition.
   
   One way to reduce boilerplate in the case of private inheritance and unmodified member functions is via using-declarations. It still requires iterating the member functions that are to be exposed without modification, but they are shorter than providing wrapper implementations.
   
   In my opinion, cleaner dependencies (in this case) add more value than avoided boilerplate, but this is subjective.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400056195
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
+    }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
+    checkLock(lck);
+    return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque<T> queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data
 
 Review comment:
   I would like to add:
   - `stop` interrupts all consumers without a chance to consume remaining elements in the queue
   - started means queue elements can be consumed/dequeued. 
   - It's possible to enqueue elements regardless of the running state.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711671
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
+  ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  ConditionConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Yes, still used to protect the delayed queue and the status of current tasks. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711012
 
 

 ##########
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##########
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  virtual size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  virtual void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      return false;
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : public ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+  
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool tryDequeue(T& out) override {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    if (running_ && this->queue_.empty()) {
+      cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // Only wake up if there is something to return or stopped 
+    }
+    return running_ && ConcurrentQueue<T>::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    if (!running_) {
+      running_ = true;
+      if (!this->queue_.empty()) {
+	cv_.notify_all();
 
 Review comment:
   Thanks, fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399650366
 
 

 ##########
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##########
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   You can either remove the `utils::` prefix from the symbol references from this namespace in this file or change this line to `namespace utils = org::apache::nifi::minifi::utils;`. I personally have no preference, but abseil (google) recommends no using-directives at all.
   
   Abseil: "Tip of the Week #153: Don't use using-directives" https://abseil.io/tips/153
   
   The C++ Core Guidelines allow this usage of using-directives:
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rs-using

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397169633
 
 

 ##########
 File path: libminifi/include/utils/ThreadPool.h
 ##########
 @@ -303,8 +303,9 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   Could you add a code comment explaining this, including the fact that this is a best-effort solution? It will be helpful to future readers of the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640583
 
 

 ##########
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##########
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock<std::mutex>& lck, T& out) {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!");
+    }
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+  std::deque<T> queue_;
+  mutable std::mutex mtx_;
+};
+
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue<T>(), running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
+
+  using ConcurrentQueue<T>::size;
+  using ConcurrentQueue<T>::empty;
+  using ConcurrentQueue<T>::clear;
+
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool dequeue(T& out) {
 
 Review comment:
   I think a non-blocking `tryDequeue` would be valuable in this class. Also, it would be nice to be able to wait for a specified duration or until a given time point, using `cv_.wait_for()` and `cv_.wait_until()`.
   My idea of the naming is (not very creative, but obvious):
   - Indefinite blocking: `dequeue_wait`
   - Blocking for duration: `dequeue_wait_for`
   - Blocking until timestamp: `dequeue_wait_until`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services