You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/06 15:04:30 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d907ff  MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
9d907ff is described below

commit 9d907ff11c86f028eb717621688e80ed618f8590
Author: Adam Hunyadi <hu...@gmail.com>
AuthorDate: Mon Apr 27 17:37:07 2020 +0200

    MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #773
---
 libminifi/include/utils/GeneralUtils.h             |   4 +
 libminifi/include/utils/MinifiConcurrentQueue.h    |  59 +++-
 libminifi/include/utils/StringUtils.h              |   5 +-
 libminifi/include/utils/TryMoveCall.h              |  61 ++++
 libminifi/test/unit/MinifiConcurrentQueueTests.cpp | 331 +++++++++++++++------
 5 files changed, 359 insertions(+), 101 deletions(-)

diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index 42b1e89..e1cef70 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -56,6 +56,10 @@ T exchange(T& obj, U&& new_value) {
   obj = std::forward<U>(new_value);
   return old_value;
 }
+
+template<typename...>
+using void_t = void;
+
 #else
 using std::exchange;
 #endif /* < C++14 */
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h
index d0bdcab..80545fb 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -21,20 +21,22 @@
 #include <deque>
 #include <mutex>
 #include <condition_variable>
+#include <utility>
 #include <stdexcept>
 
+#include <utils/TryMoveCall.h>
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace utils {
 
-
 // Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
 // Guarantees elements to be dequeued in order of insertion
 template <typename T>
 class ConcurrentQueue {
- public:    
+ public:
   explicit ConcurrentQueue() = default;
 
   ConcurrentQueue(const ConcurrentQueue& other) = delete;
@@ -57,9 +59,15 @@ class ConcurrentQueue {
     return tryDequeueImpl(lck, out);
   }
 
+  template<typename Functor>
+  bool consume(Functor&& fun) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return consumeImpl(std::move(lck), std::forward<Functor>(fun));
+  }
+
   bool empty() const {
     std::unique_lock<std::mutex> lck(mtx_);
-    return queue_.emptyImpl(lck);
+    return emptyImpl(lck);
   }
 
   size_t size() const {
@@ -85,17 +93,33 @@ class ConcurrentQueue {
  protected:
   void checkLock(std::unique_lock<std::mutex>& lck) const {
     if (!lck.owns_lock()) {
-      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); 
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!");
     }
   }
 
+  // Warning: this function copies if T is not nothrow move constructible
   bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
     checkLock(lck);
     if (queue_.empty()) {
       return false;
     }
-    out = std::move(queue_.front());
+    out = std::move_if_noexcept(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  // Warning: this function copies if T is not nothrow move constructible
+  template<typename Functor>
+  bool consumeImpl(std::unique_lock<std::mutex>&& lock_to_adopt, Functor&& fun) {
+    std::unique_lock<std::mutex> lock(std::move(lock_to_adopt));
+    checkLock(lock);
+    if (queue_.empty()) {
+      return false;
+    }
+    T elem = std::move_if_noexcept(queue_.front());
     queue_.pop_front();
+    lock.unlock();
+    TryMoveCall<Functor, T>::call(std::forward<Functor>(fun), elem);
     return true;
   }
 
@@ -117,7 +141,7 @@ template <typename T>
 class ConditionConcurrentQueue : private ConcurrentQueue<T> {
  public:
   explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {}
-  
+
   ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
   ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
   ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
@@ -127,7 +151,6 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
   using ConcurrentQueue<T>::empty;
   using ConcurrentQueue<T>::clear;
 
-
   template <typename... Args>
   void enqueue(Args&&... args) {
     ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
@@ -135,13 +158,20 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
       cv_.notify_one();
     }
   }
-  
+
   bool dequeueWait(T& out) {
     std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped 
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped
     return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
   }
 
+  template<typename Functor>
+  bool consumeWait(Functor&& fun) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Only wake up if there is something to return or stopped
+    return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
+  }
+
   template< class Rep, class Period >
   bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
     std::unique_lock<std::mutex> lck(this->mtx_);
@@ -149,11 +179,18 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
     return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
   }
 
+  template<typename Functor, class Rep, class Period>
+  bool consumeWaitFor(Functor&& fun, const std::chrono::duration<Rep, Period>& time) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); });  // Wake up with timeout or in case there is something to do
+    return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
+  }
+
   bool tryDequeue(T& out) {
     std::unique_lock<std::mutex> lck(this->mtx_);
     return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
   }
-  
+
   void stop() {
     std::lock_guard<std::mutex> guard(this->mtx_);
     running_ = false;
@@ -164,7 +201,7 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
     std::unique_lock<std::mutex> lck(this->mtx_);
     running_ = true;
   }
-  
+
   bool isRunning() const {
     std::lock_guard<std::mutex> guard(this->mtx_);
     return running_;  // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 03267b1..e6e718a 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -29,6 +29,7 @@
 #include <map>
 #include <type_traits>
 #include "utils/FailurePolicy.h"
+#include "utils/GeneralUtils.h"
 
 enum TimeUnit {
   DAY,
@@ -187,10 +188,8 @@ class StringUtils {
   }
 
   #ifndef _MSC_VER
-  // partial detection idiom impl
-  template<typename...>
-  using void_t = void;
 
+  // partial detection idiom impl, from cppreference.com
   struct nonesuch{};
 
   template<typename Default, typename Void, template<class...> class Op, typename... Args>
diff --git a/libminifi/include/utils/TryMoveCall.h b/libminifi/include/utils/TryMoveCall.h
new file mode 100644
index 0000000..3970321
--- /dev/null
+++ b/libminifi/include/utils/TryMoveCall.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <utility>
+
+#include <utils/GeneralUtils.h> // NOLINT
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+// TryMoveCall calls an
+//  - unary function of a lvalue reference-type argument by passing a ref
+//  - unary function of any other argument type by moving into it
+template<typename /* FunType */, typename T, typename = void>
+struct TryMoveCall {
+    template<typename Fun>
+    static auto call(Fun&& fun, T& elem) -> decltype(std::forward<Fun>(fun)(elem)) { return std::forward<Fun>(fun)(elem); }
+};
+
+// 1.) std::declval looks similar to this: template<typename T> T&& declval();.
+//     Not defined, therefore it's only usable in unevaluated context.
+//     No requirements regarding T, therefore makes it possible to create hypothetical objects
+//     without requiring e.g. a default constructor and a destructor, like the T{} expression does.
+// 2.) std::declval<FunType>() resolves to an object of type FunType. If FunType is an lvalue reference,
+//     then this will also result in an lvalue reference due to reference collapsing.
+// 3.) std::declval<FunType>()(std::declval<T>()) resolves to an object of the result type of
+//     a call on a function object of type FunType with an rvalue argument of type T.
+//     It is ill-formed if the function object expect an lvalue reference.
+//         - Example: FunType is a pointer to a bool(int) and T is int. This expression will result in a bool object.
+//         - Example: FunType is a function object modeling bool(int&) and T is int. This expression will be ill-formed because it's illegal to bind an int rvalue to an int&.
+// 4.) void_t<decltype(*3*)> checks for the well-formedness of 3., then discards it.
+//     If 3. is ill-formed, then this specialization is ignored through SFINAE.
+//     If well-formed, then it's considered more specialized than the other and takes precedence.
+template<typename FunType, typename T>
+struct TryMoveCall<FunType, T, void_t<decltype(std::declval<FunType>()(std::declval<T>()))>> {
+    template<typename Fun>
+    static auto call(Fun&& fun, T& elem) -> decltype(std::forward<Fun>(fun)(std::move(elem))) { return std::forward<Fun>(fun)(std::move(elem)); }
+};
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
index c28885c..6855de2 100644
--- a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
+++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
@@ -29,132 +29,288 @@
 
 namespace utils = org::apache::nifi::minifi::utils;
 
-TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
-  utils::ConcurrentQueue<std::string> queue;
-  std::vector<std::string> results;
+namespace {
+
+namespace MinifiConcurrentQueueTestProducersConsumers {
 
-  std::thread producer([&queue]() {
+  // Producers
+
+  template <typename Queue>
+  std::thread getSimpleProducerThread(Queue& queue) {
+    return std::thread([&queue] {
       queue.enqueue("ba");
       std::this_thread::sleep_for(std::chrono::milliseconds(3));
       queue.enqueue("dum");
       std::this_thread::sleep_for(std::chrono::milliseconds(3));
       queue.enqueue("tss");
     });
+  }
 
-  std::thread consumer([&queue, &results]() {
-     while (results.size() < 3) {
-       std::string s;
-       if (queue.tryDequeue(s)) {
-         results.push_back(s);
-       } else {
-         std::this_thread::sleep_for(std::chrono::milliseconds(1));
-       }
-     }
+  std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::mutex& mutex) {
+    return std::thread([&queue, &mutex] {
+      std::unique_lock<std::mutex> lock(mutex);
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
     });
+  }
 
-  producer.join();
-  consumer.join();
+  // Consumers
+
+  std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      while (results.size() < 3) {
+        std::string s;
+        if (queue.tryDequeue(s)) {
+          results.push_back(s);
+        } else {
+          std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+      }
+    });
+  }
 
-  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
-}
+  std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      while (results.size() < 3) {
+        std::string s;
+        if (!queue.consume([&results] (const std::string& s) { results.push_back(s); })) {
+          std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+      }
+    });
+  }
 
+  std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      std::string s;
+      while (queue.dequeueWait(s)) {
+        results.push_back(s);
+      }
+    });
+  }
 
-TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
-  utils::ConditionConcurrentQueue<std::string> queue(true);
-  std::vector<std::string> results;
+  std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      while (results.size() < 3) {
+        std::string s;
+        if (!queue.consumeWait([&results] (const std::string& s) { results.push_back(s); })) {
+          std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+      }
+    });
+  }
 
-  std::thread producer([&queue]() {
-    queue.enqueue("ba");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("dum");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("tss");
-  });
+  std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      while (results.size() < 3) {
+        std::string s;
+        if (queue.tryDequeue(s)) {
+          // Unique elements only
+          if (!std::count(results.begin(), results.end(), s)) {
+            results.push_back(s);
+          }
+          queue.enqueue(std::move(s));
+        } else {
+          std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+      }
+    });
+  }
 
-  std::thread consumer([&queue, &results]() {
-    std::string s;
-    while (queue.dequeueWait(s)) {
-      results.push_back(s);
-    }
-  });
+  std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      std::string s;
+      while (queue.dequeueWait(s)) {
+        if (!std::count(results.begin(), results.end(), s)) {
+          results.push_back(s);
+        }
+        // The consumer is busy enqueing so noone is waiting for this ;(
+        queue.enqueue(std::move(s));
+      }
+    });
+  }
 
-  producer.join();
+  std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results] {
+      const std::size_t max_read_attempts = 6;
+      for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
+        std::string s;
+        if (queue.dequeueWaitFor(s, std::chrono::milliseconds(3))) {
+          results.push_back(s);
+        }
+      }
+    });
+  }
+
+  std::thread getConsumeWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+    return std::thread([&queue, &results]() {
+      const std::size_t max_read_attempts = 6;
+      std::size_t attempt_num = 0;
+      while (results.size() < 3 && attempt_num < max_read_attempts) {
+        ++attempt_num;
+        std::string s;
+        queue.consumeWaitFor([&results] (const std::string& s) { results.push_back(s); }, std::chrono::milliseconds(3));
+      }
+    });
+  }
 
-  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+}  // namespace MinifiConcurrentQueueTestProducersConsumers
 
-  queue.stop();
+TEST_CASE("TestConcurrentQueue", "[TestConcurrentQueue]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
 
-  consumer.join();
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
 
-  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
-}
+  SECTION("empty queue") {
+    SECTION("default initialized queue is empty") {
+      REQUIRE(queue.empty());
+    }
 
+    SECTION("trying to update based on empty queue preserves original data") {
+      std::string s { "Unchanged" };
 
-/* In this testcase the consumer thread puts back all items to the queue to consume again
- * Even in this case the ones inserted later by the producer  should be consumed */
-TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
-  utils::ConcurrentQueue<std::string> queue;
-  std::set<std::string> results;
-
-  std::thread producer([&queue]() {
-    queue.enqueue("ba");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("dum");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("tss");
-  });
+      SECTION("tryDequeue on empty queue returns false") {
+        REQUIRE(!queue.tryDequeue(s));
+      }
 
-  std::thread consumer([&queue, &results]() {
-    while (results.size() < 3) {
-      std::string s;
-      if (queue.tryDequeue(s)) {
-        results.insert(s);
-        queue.enqueue(std::move(s));
-      } else {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      SECTION("consume on empty queue returns false") {
+        bool ret = queue.consume([&s] (const std::string& elem) { s = elem; });
+        REQUIRE(!ret);
       }
+      REQUIRE(s == "Unchanged");
     }
-  });
+  }
 
-  producer.join();
-  consumer.join();
+  SECTION("non-empty queue") {
+    SECTION("the queue is first-in-first-out") {
+      for (std::size_t i = 0; i < 20; ++i) {
+        queue.enqueue(std::to_string(i));
+      }
+      SECTION("tryDequeue preserves order") {
+        for (std::size_t i = 0; i < 20; ++i) {
+          std::string s;
+          queue.tryDequeue(s);
+          REQUIRE(s == std::to_string(i));
+        }
+        REQUIRE(queue.empty());
+      }
+      SECTION("consume preserves order") {
+        for (std::size_t i = 0; i < 20; ++i) {
+          std::string s;
+          queue.consume([&s] (const std::string& elem) { s = elem; });
+          REQUIRE(s == std::to_string(i));
+        }
+        REQUIRE(queue.empty());
+      }
+      SECTION("insertion does not reorder") {
+        for (std::size_t i = 0; i < 20; ++i) {
+          std::string s;
+          queue.tryDequeue(s);
+          queue.enqueue("0");
+          queue.enqueue("9");
+          REQUIRE(s == std::to_string(i));
+        }
+        REQUIRE(40 == queue.size());
+      }
+    }
+  }
+}
+
+TEST_CASE("TestConcurrentQueue::testProducerConsumer", "[TestConcurrentQueueProducerConsumer]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  SECTION("producers and consumers work synchronized") {
+    std::thread producer;
+    std::thread consumer;
+    SECTION("using tryDequeue") {
+        producer = getSimpleProducerThread(queue);
+        consumer = getSimpleTryDequeConsumerThread(queue, results);
+      }
+    SECTION("using consume") {
+        producer = getSimpleProducerThread(queue);
+        consumer = getSimpleConsumeConsumerThread(queue, results);
+    }
+    /* In this testcase the consumer thread puts back all items to the queue to consume again
+    * Even in this case the ones inserted later by the producer should be consumed */
+    SECTION("with readd") {
+      producer = getSimpleProducerThread(queue);
+      consumer = getSpinningReaddingDequeueConsumerThread(queue, results);
+    }
+    producer.join();
+    consumer.join();
+  }
 
   REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
 }
 
-/* The same test as above, but covering the ConditionConcurrentQueue */
-TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", "[TestConditionQueueWithReAdd]") {
+TEST_CASE("TestConditionConcurrentQueue::testProducerConsumer", "[TestConditionConcurrentQueueProducerConsumer]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
   utils::ConditionConcurrentQueue<std::string> queue(true);
-  std::set<std::string> results;
-
-  std::thread producer([&queue]()  {
-    queue.enqueue("ba");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("dum");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("tss");
-  });
+  std::vector<std::string> results;
 
-  std::thread consumer([&queue, &results]() {
-    std::string s;
-    while (queue.dequeueWait(s)) {
-      results.insert(s);
-      queue.enqueue(std::move(s));
+  SECTION("consumers fetching data from producers is synchronized and fifo") {
+    std::thread producer { getSimpleProducerThread(queue) };
+    std::thread consumer;
+    SECTION("using dequeueWait") {
+      consumer = getDequeueWaitConsumerThread(queue, results);
     }
-  });
-
-  producer.join();
+    SECTION("using consumeWait") {
+      consumer = getConsumeWaitConsumerThread(queue, results);
+    }
+    SECTION("using dequeueWaitFor") {
+      consumer = getDequeueWaitForConsumerThread(queue, results);
+    }
+    SECTION("using consumeWaitFor") {
+      consumer = getConsumeWaitForConsumerThread(queue, results);
+    }
+    producer.join();
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    queue.stop();
+    consumer.join();
 
-  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+  }
 
-  queue.stop();
+  /* The same test as above, but covering the ConditionConcurrentQueue */
+  SECTION("with readd") {
+    std::thread consumer { getReaddingDequeueConsumerThread(queue, results) };
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    std::thread producer { getSimpleProducerThread(queue) };
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    producer.join();
+    queue.stop();
+    consumer.join();
+
+    REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+  }
 
-  consumer.join();
+  // Blocked producers
+  SECTION("consumer times out when using waitFor and time is up") {
+    std::mutex mutex;
+    std::unique_lock<std::mutex> lock(mutex);
+    std::thread producer{ getBlockedProducerThread(queue, mutex) };
+    std::thread consumer;
+    SECTION("using dequeueWaitFor") {
+      consumer = getDequeueWaitForConsumerThread(queue, results);
+    }
+    SECTION("using consumeWaitFor") {
+      consumer = getConsumeWaitForConsumerThread(queue, results);
+    }
+    consumer.join();
+    lock.unlock();
+    producer.join();
 
-  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+    REQUIRE(results.empty());
+  }
 }
 
-TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
+TEST_CASE("TestConcurrentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
   std::random_device dev;
   std::mt19937 rng(dev());
   std::uniform_int_distribution<std::mt19937::result_type> dist(1, std::numeric_limits<int>::max());
@@ -201,3 +357,4 @@ TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
 
   REQUIRE(source == target);
 }
+}  // anon namespace