You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/06 15:04:30 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1202 - Extend
interface and add new tests for MinifiConcurrentQueue
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 9d907ff MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
9d907ff is described below
commit 9d907ff11c86f028eb717621688e80ed618f8590
Author: Adam Hunyadi <hu...@gmail.com>
AuthorDate: Mon Apr 27 17:37:07 2020 +0200
MINIFICPP-1202 - Extend interface and add new tests for MinifiConcurrentQueue
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #773
---
libminifi/include/utils/GeneralUtils.h | 4 +
libminifi/include/utils/MinifiConcurrentQueue.h | 59 +++-
libminifi/include/utils/StringUtils.h | 5 +-
libminifi/include/utils/TryMoveCall.h | 61 ++++
libminifi/test/unit/MinifiConcurrentQueueTests.cpp | 331 +++++++++++++++------
5 files changed, 359 insertions(+), 101 deletions(-)
diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index 42b1e89..e1cef70 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -56,6 +56,10 @@ T exchange(T& obj, U&& new_value) {
obj = std::forward<U>(new_value);
return old_value;
}
+
+template<typename...>
+using void_t = void;
+
#else
using std::exchange;
#endif /* < C++14 */
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h
index d0bdcab..80545fb 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -21,20 +21,22 @@
#include <deque>
#include <mutex>
#include <condition_variable>
+#include <utility>
#include <stdexcept>
+#include <utils/TryMoveCall.h>
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace utils {
-
// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
// Guarantees elements to be dequeued in order of insertion
template <typename T>
class ConcurrentQueue {
- public:
+ public:
explicit ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue& other) = delete;
@@ -57,9 +59,15 @@ class ConcurrentQueue {
return tryDequeueImpl(lck, out);
}
+ template<typename Functor>
+ bool consume(Functor&& fun) {
+ std::unique_lock<std::mutex> lck(mtx_);
+ return consumeImpl(std::move(lck), std::forward<Functor>(fun));
+ }
+
bool empty() const {
std::unique_lock<std::mutex> lck(mtx_);
- return queue_.emptyImpl(lck);
+ return emptyImpl(lck);
}
size_t size() const {
@@ -85,17 +93,33 @@ class ConcurrentQueue {
protected:
void checkLock(std::unique_lock<std::mutex>& lck) const {
if (!lck.owns_lock()) {
- throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!");
+ throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!");
}
}
+ // Warning: this function copies if T is not nothrow move constructible
bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
checkLock(lck);
if (queue_.empty()) {
return false;
}
- out = std::move(queue_.front());
+ out = std::move_if_noexcept(queue_.front());
+ queue_.pop_front();
+ return true;
+ }
+
+ // Warning: this function copies if T is not nothrow move constructible
+ template<typename Functor>
+ bool consumeImpl(std::unique_lock<std::mutex>&& lock_to_adopt, Functor&& fun) {
+ std::unique_lock<std::mutex> lock(std::move(lock_to_adopt));
+ checkLock(lock);
+ if (queue_.empty()) {
+ return false;
+ }
+ T elem = std::move_if_noexcept(queue_.front());
queue_.pop_front();
+ lock.unlock();
+ TryMoveCall<Functor, T>::call(std::forward<Functor>(fun), elem);
return true;
}
@@ -117,7 +141,7 @@ template <typename T>
class ConditionConcurrentQueue : private ConcurrentQueue<T> {
public:
explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {}
-
+
ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
@@ -127,7 +151,6 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
using ConcurrentQueue<T>::empty;
using ConcurrentQueue<T>::clear;
-
template <typename... Args>
void enqueue(Args&&... args) {
ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
@@ -135,13 +158,20 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
cv_.notify_one();
}
}
-
+
bool dequeueWait(T& out) {
std::unique_lock<std::mutex> lck(this->mtx_);
- cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped
+ cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
+ template<typename Functor>
+ bool consumeWait(Functor&& fun) {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped
+ return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
+ }
+
template< class Rep, class Period >
bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
std::unique_lock<std::mutex> lck(this->mtx_);
@@ -149,11 +179,18 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
+ template<typename Functor, class Rep, class Period>
+ bool consumeWaitFor(Functor&& fun, const std::chrono::duration<Rep, Period>& time) {
+ std::unique_lock<std::mutex> lck(this->mtx_);
+ cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do
+ return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
+ }
+
bool tryDequeue(T& out) {
std::unique_lock<std::mutex> lck(this->mtx_);
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
-
+
void stop() {
std::lock_guard<std::mutex> guard(this->mtx_);
running_ = false;
@@ -164,7 +201,7 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
std::unique_lock<std::mutex> lck(this->mtx_);
running_ = true;
}
-
+
bool isRunning() const {
std::lock_guard<std::mutex> guard(this->mtx_);
return running_; // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 03267b1..e6e718a 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -29,6 +29,7 @@
#include <map>
#include <type_traits>
#include "utils/FailurePolicy.h"
+#include "utils/GeneralUtils.h"
enum TimeUnit {
DAY,
@@ -187,10 +188,8 @@ class StringUtils {
}
#ifndef _MSC_VER
- // partial detection idiom impl
- template<typename...>
- using void_t = void;
+ // partial detection idiom impl, from cppreference.com
struct nonesuch{};
template<typename Default, typename Void, template<class...> class Op, typename... Args>
diff --git a/libminifi/include/utils/TryMoveCall.h b/libminifi/include/utils/TryMoveCall.h
new file mode 100644
index 0000000..3970321
--- /dev/null
+++ b/libminifi/include/utils/TryMoveCall.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <utility>
+
+#include <utils/GeneralUtils.h> // NOLINT
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+// TryMoveCall calls an
+// - unary function of a lvalue reference-type argument by passing a ref
+// - unary function of any other argument type by moving into it
+template<typename /* FunType */, typename T, typename = void>
+struct TryMoveCall {
+ template<typename Fun>
+ static auto call(Fun&& fun, T& elem) -> decltype(std::forward<Fun>(fun)(elem)) { return std::forward<Fun>(fun)(elem); }
+};
+
+// 1.) std::declval looks similar to this: template<typename T> T&& declval();.
+// Not defined, therefore it's only usable in unevaluated context.
+// No requirements regarding T, therefore makes it possible to create hypothetical objects
+// without requiring e.g. a default constructor and a destructor, like the T{} expression does.
+// 2.) std::declval<FunType>() resolves to an object of type FunType. If FunType is an lvalue reference,
+// then this will also result in an lvalue reference due to reference collapsing.
+// 3.) std::declval<FunType>()(std::declval<T>()) resolves to an object of the result type of
+// a call on a function object of type FunType with an rvalue argument of type T.
+// It is ill-formed if the function object expect an lvalue reference.
+// - Example: FunType is a pointer to a bool(int) and T is int. This expression will result in a bool object.
+// - Example: FunType is a function object modeling bool(int&) and T is int. This expression will be ill-formed because it's illegal to bind an int rvalue to an int&.
+// 4.) void_t<decltype(*3*)> checks for the well-formedness of 3., then discards it.
+// If 3. is ill-formed, then this specialization is ignored through SFINAE.
+// If well-formed, then it's considered more specialized than the other and takes precedence.
+template<typename FunType, typename T>
+struct TryMoveCall<FunType, T, void_t<decltype(std::declval<FunType>()(std::declval<T>()))>> {
+ template<typename Fun>
+ static auto call(Fun&& fun, T& elem) -> decltype(std::forward<Fun>(fun)(std::move(elem))) { return std::forward<Fun>(fun)(std::move(elem)); }
+};
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
index c28885c..6855de2 100644
--- a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
+++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
@@ -29,132 +29,288 @@
namespace utils = org::apache::nifi::minifi::utils;
-TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
- utils::ConcurrentQueue<std::string> queue;
- std::vector<std::string> results;
+namespace {
+
+namespace MinifiConcurrentQueueTestProducersConsumers {
- std::thread producer([&queue]() {
+ // Producers
+
+ template <typename Queue>
+ std::thread getSimpleProducerThread(Queue& queue) {
+ return std::thread([&queue] {
queue.enqueue("ba");
std::this_thread::sleep_for(std::chrono::milliseconds(3));
queue.enqueue("dum");
std::this_thread::sleep_for(std::chrono::milliseconds(3));
queue.enqueue("tss");
});
+ }
- std::thread consumer([&queue, &results]() {
- while (results.size() < 3) {
- std::string s;
- if (queue.tryDequeue(s)) {
- results.push_back(s);
- } else {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- }
+ std::thread getBlockedProducerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::mutex& mutex) {
+ return std::thread([&queue, &mutex] {
+ std::unique_lock<std::mutex> lock(mutex);
+ queue.enqueue("ba");
+ std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ queue.enqueue("dum");
+ std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ queue.enqueue("tss");
});
+ }
- producer.join();
- consumer.join();
+ // Consumers
+
+ std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ while (results.size() < 3) {
+ std::string s;
+ if (queue.tryDequeue(s)) {
+ results.push_back(s);
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+ }
- REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
-}
+ std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ while (results.size() < 3) {
+ std::string s;
+ if (!queue.consume([&results] (const std::string& s) { results.push_back(s); })) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+ }
+ std::thread getDequeueWaitConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ std::string s;
+ while (queue.dequeueWait(s)) {
+ results.push_back(s);
+ }
+ });
+ }
-TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
- utils::ConditionConcurrentQueue<std::string> queue(true);
- std::vector<std::string> results;
+ std::thread getConsumeWaitConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ while (results.size() < 3) {
+ std::string s;
+ if (!queue.consumeWait([&results] (const std::string& s) { results.push_back(s); })) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+ }
- std::thread producer([&queue]() {
- queue.enqueue("ba");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("dum");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("tss");
- });
+ std::thread getSpinningReaddingDequeueConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ while (results.size() < 3) {
+ std::string s;
+ if (queue.tryDequeue(s)) {
+ // Unique elements only
+ if (!std::count(results.begin(), results.end(), s)) {
+ results.push_back(s);
+ }
+ queue.enqueue(std::move(s));
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+ });
+ }
- std::thread consumer([&queue, &results]() {
- std::string s;
- while (queue.dequeueWait(s)) {
- results.push_back(s);
- }
- });
+ std::thread getReaddingDequeueConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ std::string s;
+ while (queue.dequeueWait(s)) {
+ if (!std::count(results.begin(), results.end(), s)) {
+ results.push_back(s);
+ }
+ // The consumer is busy enqueing so noone is waiting for this ;(
+ queue.enqueue(std::move(s));
+ }
+ });
+ }
- producer.join();
+ std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results] {
+ const std::size_t max_read_attempts = 6;
+ for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
+ std::string s;
+ if (queue.dequeueWaitFor(s, std::chrono::milliseconds(3))) {
+ results.push_back(s);
+ }
+ }
+ });
+ }
+
+ std::thread getConsumeWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
+ return std::thread([&queue, &results]() {
+ const std::size_t max_read_attempts = 6;
+ std::size_t attempt_num = 0;
+ while (results.size() < 3 && attempt_num < max_read_attempts) {
+ ++attempt_num;
+ std::string s;
+ queue.consumeWaitFor([&results] (const std::string& s) { results.push_back(s); }, std::chrono::milliseconds(3));
+ }
+ });
+ }
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
+} // namespace MinifiConcurrentQueueTestProducersConsumers
- queue.stop();
+TEST_CASE("TestConcurrentQueue", "[TestConcurrentQueue]") {
+ using namespace MinifiConcurrentQueueTestProducersConsumers;
- consumer.join();
+ utils::ConcurrentQueue<std::string> queue;
+ std::vector<std::string> results;
- REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
-}
+ SECTION("empty queue") {
+ SECTION("default initialized queue is empty") {
+ REQUIRE(queue.empty());
+ }
+ SECTION("trying to update based on empty queue preserves original data") {
+ std::string s { "Unchanged" };
-/* In this testcase the consumer thread puts back all items to the queue to consume again
- * Even in this case the ones inserted later by the producer should be consumed */
-TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
- utils::ConcurrentQueue<std::string> queue;
- std::set<std::string> results;
-
- std::thread producer([&queue]() {
- queue.enqueue("ba");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("dum");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("tss");
- });
+ SECTION("tryDequeue on empty queue returns false") {
+ REQUIRE(!queue.tryDequeue(s));
+ }
- std::thread consumer([&queue, &results]() {
- while (results.size() < 3) {
- std::string s;
- if (queue.tryDequeue(s)) {
- results.insert(s);
- queue.enqueue(std::move(s));
- } else {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ SECTION("consume on empty queue returns false") {
+ bool ret = queue.consume([&s] (const std::string& elem) { s = elem; });
+ REQUIRE(!ret);
}
+ REQUIRE(s == "Unchanged");
}
- });
+ }
- producer.join();
- consumer.join();
+ SECTION("non-empty queue") {
+ SECTION("the queue is first-in-first-out") {
+ for (std::size_t i = 0; i < 20; ++i) {
+ queue.enqueue(std::to_string(i));
+ }
+ SECTION("tryDequeue preserves order") {
+ for (std::size_t i = 0; i < 20; ++i) {
+ std::string s;
+ queue.tryDequeue(s);
+ REQUIRE(s == std::to_string(i));
+ }
+ REQUIRE(queue.empty());
+ }
+ SECTION("consume preserves order") {
+ for (std::size_t i = 0; i < 20; ++i) {
+ std::string s;
+ queue.consume([&s] (const std::string& elem) { s = elem; });
+ REQUIRE(s == std::to_string(i));
+ }
+ REQUIRE(queue.empty());
+ }
+ SECTION("insertion does not reorder") {
+ for (std::size_t i = 0; i < 20; ++i) {
+ std::string s;
+ queue.tryDequeue(s);
+ queue.enqueue("0");
+ queue.enqueue("9");
+ REQUIRE(s == std::to_string(i));
+ }
+ REQUIRE(40 == queue.size());
+ }
+ }
+ }
+}
+
+TEST_CASE("TestConcurrentQueue::testProducerConsumer", "[TestConcurrentQueueProducerConsumer]") {
+ using namespace MinifiConcurrentQueueTestProducersConsumers;
+ utils::ConcurrentQueue<std::string> queue;
+ std::vector<std::string> results;
+
+ SECTION("producers and consumers work synchronized") {
+ std::thread producer;
+ std::thread consumer;
+ SECTION("using tryDequeue") {
+ producer = getSimpleProducerThread(queue);
+ consumer = getSimpleTryDequeConsumerThread(queue, results);
+ }
+ SECTION("using consume") {
+ producer = getSimpleProducerThread(queue);
+ consumer = getSimpleConsumeConsumerThread(queue, results);
+ }
+ /* In this testcase the consumer thread puts back all items to the queue to consume again
+ * Even in this case the ones inserted later by the producer should be consumed */
+ SECTION("with readd") {
+ producer = getSimpleProducerThread(queue);
+ consumer = getSpinningReaddingDequeueConsumerThread(queue, results);
+ }
+ producer.join();
+ consumer.join();
+ }
REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
}
-/* The same test as above, but covering the ConditionConcurrentQueue */
-TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", "[TestConditionQueueWithReAdd]") {
+TEST_CASE("TestConditionConcurrentQueue::testProducerConsumer", "[TestConditionConcurrentQueueProducerConsumer]") {
+ using namespace MinifiConcurrentQueueTestProducersConsumers;
utils::ConditionConcurrentQueue<std::string> queue(true);
- std::set<std::string> results;
-
- std::thread producer([&queue]() {
- queue.enqueue("ba");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("dum");
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- queue.enqueue("tss");
- });
+ std::vector<std::string> results;
- std::thread consumer([&queue, &results]() {
- std::string s;
- while (queue.dequeueWait(s)) {
- results.insert(s);
- queue.enqueue(std::move(s));
+ SECTION("consumers fetching data from producers is synchronized and fifo") {
+ std::thread producer { getSimpleProducerThread(queue) };
+ std::thread consumer;
+ SECTION("using dequeueWait") {
+ consumer = getDequeueWaitConsumerThread(queue, results);
}
- });
-
- producer.join();
+ SECTION("using consumeWait") {
+ consumer = getConsumeWaitConsumerThread(queue, results);
+ }
+ SECTION("using dequeueWaitFor") {
+ consumer = getDequeueWaitForConsumerThread(queue, results);
+ }
+ SECTION("using consumeWaitFor") {
+ consumer = getConsumeWaitForConsumerThread(queue, results);
+ }
+ producer.join();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ queue.stop();
+ consumer.join();
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+ }
- queue.stop();
+ /* The same test as above, but covering the ConditionConcurrentQueue */
+ SECTION("with readd") {
+ std::thread consumer { getReaddingDequeueConsumerThread(queue, results) };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ std::thread producer { getSimpleProducerThread(queue) };
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ producer.join();
+ queue.stop();
+ consumer.join();
+
+ REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+ }
- consumer.join();
+ // Blocked producers
+ SECTION("consumer times out when using waitFor and time is up") {
+ std::mutex mutex;
+ std::unique_lock<std::mutex> lock(mutex);
+ std::thread producer{ getBlockedProducerThread(queue, mutex) };
+ std::thread consumer;
+ SECTION("using dequeueWaitFor") {
+ consumer = getDequeueWaitForConsumerThread(queue, results);
+ }
+ SECTION("using consumeWaitFor") {
+ consumer = getConsumeWaitForConsumerThread(queue, results);
+ }
+ consumer.join();
+ lock.unlock();
+ producer.join();
- REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+ REQUIRE(results.empty());
+ }
}
-TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
+TEST_CASE("TestConcurrentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(1, std::numeric_limits<int>::max());
@@ -201,3 +357,4 @@ TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
REQUIRE(source == target);
}
+} // anon namespace