You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/10/13 02:21:58 UTC
[brpc] branch master updated: Support pthread mode for ExecutionQueue (#2333)
This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new a5572936 Support pthread mode for ExecutionQueue (#2333)
a5572936 is described below
commit a55729366cd17e10a153de956a1b546c73f1f2c3
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Fri Oct 13 10:21:49 2023 +0800
Support pthread mode for ExecutionQueue (#2333)
---
docs/cn/execution_queue.md | 16 ++
src/bthread/execution_queue.cpp | 71 +++++++--
src/bthread/execution_queue.h | 15 +-
src/bthread/execution_queue_inl.h | 19 ++-
test/bthread_execution_queue_unittest.cpp | 236 +++++++++++++++++++++++++-----
5 files changed, 299 insertions(+), 58 deletions(-)
diff --git a/docs/cn/execution_queue.md b/docs/cn/execution_queue.md
index 5fb9d412..8640a4af 100644
--- a/docs/cn/execution_queue.md
+++ b/docs/cn/execution_queue.md
@@ -78,6 +78,22 @@ class TaskIterator;
### 启动一个ExecutionQueue:
```
+struct ExecutionQueueOptions {
+ ExecutionQueueOptions();
+
+ // Execute in resident pthread instead of bthread. default: false.
+ bool use_pthread;
+
+ // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL
+ // Bthread will be used when executor = NULL and use_pthread == false.
+ bthread_attr_t bthread_attr;
+
+ // Executor that tasks run on. default: NULL
+ // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
+ // Executor is in-place(synchronous).
+ Executor * executor;
+};
+
// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// default options.
// Returns 0 on success, errno otherwise
diff --git a/src/bthread/execution_queue.cpp b/src/bthread/execution_queue.cpp
index f7fd9eae..557669ee 100644
--- a/src/bthread/execution_queue.cpp
+++ b/src/bthread/execution_queue.cpp
@@ -24,6 +24,7 @@
#include "butil/memory/singleton_on_pthread_once.h"
#include "butil/object_pool.h" // butil::get_object
#include "butil/resource_pool.h" // butil::get_resource
+#include "butil/threading/platform_thread.h"
namespace bthread {
@@ -105,16 +106,34 @@ void ExecutionQueueBase::start_execute(TaskNode* node) {
}
if (nullptr == _options.executor) {
- bthread_t tid;
- // We start the execution thread in background instead of foreground as
- // we can't determine whether the code after execute() is urgent (like
- // unlock a pthread_mutex_t) in which case implicit context switch may
- // cause undefined behavior (e.g. deadlock)
- if (bthread_start_background(&tid, &_options.bthread_attr,
- _execute_tasks, node) != 0) {
- PLOG(FATAL) << "Fail to start bthread";
- _execute_tasks(node);
+ if (_options.use_pthread) {
+ if (_pthread_started) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ _current_head = node;
+ _cond.Signal();
+ } else {
+ // Start the execution bthread in background once.
+ if (pthread_create(&_pid, NULL,
+ _execute_tasks_pthread,
+ node) != 0) {
+ PLOG(FATAL) << "Fail to create pthread";
+ _execute_tasks(node);
+ }
+ _pthread_started = true;
+ }
+ } else {
+ bthread_t tid;
+ // We start the execution bthread in background instead of foreground as
+ // we can't determine whether the code after execute() is urgent (like
+ // unlock a pthread_mutex_t) in which case implicit context switch may
+ // cause undefined behavior (e.g. deadlock)
+ if (bthread_start_background(&tid, &_options.bthread_attr,
+ _execute_tasks, node) != 0) {
+ PLOG(FATAL) << "Fail to start bthread";
+ _execute_tasks(node);
+ }
}
+
} else {
if (_options.executor->submit(_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to submit task";
@@ -176,10 +195,10 @@ void* ExecutionQueueBase::_execute_tasks(void* arg) {
CHECK(m->_stopped);
// Add _join_butex by 2 to make it equal to the next version of the
// ExecutionQueue from the same slot so that join with old id would
- // return immediatly.
+ // return immediately.
//
- // 1: release fence to make join sees the newst changes when it sees
- // the newst _join_butex
+ // 1: release fence to make join sees the newest changes when it sees
+ // the newest _join_butex
m->_join_butex->fetch_add(2, butil::memory_order_release/*1*/);
butex_wake_all(m->_join_butex);
vars->execq_count << -1;
@@ -189,6 +208,28 @@ void* ExecutionQueueBase::_execute_tasks(void* arg) {
return NULL;
}
+void* ExecutionQueueBase::_execute_tasks_pthread(void* arg) {
+ butil::PlatformThread::SetName("ExecutionQueue");
+ auto head = (TaskNode*)arg;
+ auto m = (ExecutionQueueBase*)head->q;
+ m->_current_head = head;
+ while (true) {
+ BAIDU_SCOPED_LOCK(m->_mutex);
+ while (!m->_current_head) {
+ m->_cond.Wait();
+ }
+ _execute_tasks(m->_current_head);
+ m->_current_head = NULL;
+
+ int expected = _version_of_id(m->_this_id);
+ if (expected != m->_join_butex->load(butil::memory_order_relaxed)) {
+ // Execute queue has been stopped and stopped task has been executed, quit.
+ break;
+ }
+ }
+ return NULL;
+}
+
void ExecutionQueueBase::return_task_node(TaskNode* node) {
node->clear_before_return(_clear_func);
butil::return_object<TaskNode>(node);
@@ -227,6 +268,10 @@ int ExecutionQueueBase::join(uint64_t id) {
return errno;
}
}
+ // Join pthread if it's started.
+ if (m->_options.use_pthread && m->_pthread_started) {
+ pthread_join(m->_pid, NULL);
+ }
return 0;
}
@@ -365,6 +410,8 @@ int ExecutionQueueBase::create(uint64_t* id, const ExecutionQueueOptions* option
_version_of_vref(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)), slot);
*id = m->_this_id;
+ m->_pthread_started = false;
+ m->_current_head = NULL;
get_execq_vars()->execq_count << 1;
return 0;
}
diff --git a/src/bthread/execution_queue.h b/src/bthread/execution_queue.h
index 7fc46be5..5ceef89f 100644
--- a/src/bthread/execution_queue.h
+++ b/src/bthread/execution_queue.h
@@ -88,11 +88,11 @@ private:
// }
template <typename T>
class TaskIterator : public TaskIteratorBase {
- TaskIterator();
public:
typedef T* pointer;
typedef T& reference;
+ TaskIterator() = delete;
reference operator*() const;
pointer operator->() const { return &(operator*()); }
TaskIterator& operator++();
@@ -133,7 +133,7 @@ const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);
class Executor {
public:
- virtual ~Executor() {}
+ virtual ~Executor() = default;
// Return 0 on success.
virtual int submit(void * (*fn)(void*), void* args) = 0;
@@ -141,11 +141,15 @@ public:
struct ExecutionQueueOptions {
ExecutionQueueOptions();
- // Attribute of the bthread which execute runs on
- // default: BTHREAD_ATTR_NORMAL
+
+ // Execute in resident pthread instead of bthread. default: false.
+ bool use_pthread;
+
+ // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL
+ // Bthread will be used when executor = NULL and use_pthread == false.
bthread_attr_t bthread_attr;
- // Executor that tasks run on. bthread will be used when executor = NULL.
+ // Executor that tasks run on. default: NULL
// Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
// Executor is in-place(synchronous).
Executor * executor;
@@ -198,7 +202,6 @@ int execution_queue_execute(ExecutionQueueId<T> id,
const TaskOptions* options,
TaskHandle* handle);
-
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task);
diff --git a/src/bthread/execution_queue_inl.h b/src/bthread/execution_queue_inl.h
index b932cbc9..64725051 100644
--- a/src/bthread/execution_queue_inl.h
+++ b/src/bthread/execution_queue_inl.h
@@ -29,6 +29,7 @@
#include "butil/time.h" // butil::cpuwide_time_ns
#include "bvar/bvar.h" // bvar::Adder
#include "bthread/butex.h" // butex_construct
+#include "butil/synchronization/condition_variable.h"
namespace bthread {
@@ -168,7 +169,9 @@ public:
: _head(NULL)
, _versioned_ref(0) // join() depends on even version
, _high_priority_tasks(0)
- {
+ , _pthread_started(false)
+ , _cond(&_mutex)
+ , _current_head(NULL) {
_join_butex = butex_create_checked<butil::atomic<int> >();
_join_butex->store(0, butil::memory_order_relaxed);
}
@@ -203,6 +206,7 @@ private:
void _on_recycle();
int _execute(TaskNode* head, bool high_priority, int* niterated);
static void* _execute_tasks(void* arg);
+ static void* _execute_tasks_pthread(void* arg);
static inline uint32_t _version_of_id(uint64_t id) WARN_UNUSED_RESULT {
return (uint32_t)(id >> 32);
@@ -234,6 +238,13 @@ private:
clear_task_mem _clear_func;
ExecutionQueueOptions _options;
butil::atomic<int>* _join_butex;
+
+ // For pthread mode.
+ pthread_t _pid;
+ bool _pthread_started;
+ butil::Mutex _mutex;
+ butil::ConditionVariable _cond;
+ TaskNode* _current_head; // Current task head of each execution.
};
template <typename T>
@@ -330,12 +341,14 @@ public:
};
inline ExecutionQueueOptions::ExecutionQueueOptions()
- : bthread_attr(BTHREAD_ATTR_NORMAL), executor(NULL)
+ : use_pthread(false)
+ , bthread_attr(BTHREAD_ATTR_NORMAL)
+ , executor(NULL)
{}
template <typename T>
inline int execution_queue_start(
- ExecutionQueueId<T>* id,
+ ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>&),
void* meta) {
diff --git a/test/bthread_execution_queue_unittest.cpp b/test/bthread_execution_queue_unittest.cpp
index 627d27c8..6ecad01b 100644
--- a/test/bthread_execution_queue_unittest.cpp
+++ b/test/bthread_execution_queue_unittest.cpp
@@ -55,14 +55,20 @@ int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, single_thread) {
+void test_single_thread(bool use_pthread) {
int64_t result = 0;
int64_t expected_result = 0;
stopped = false;
bthread::ExecutionQueueId<LongIntTask> queue_id;
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
- add, &result));
+ add, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
@@ -75,6 +81,12 @@ TEST_F(ExecutionQueueTest, single_thread) {
ASSERT_TRUE(stopped);
}
+TEST_F(ExecutionQueueTest, single_thread) {
+ for (int i = 0; i < 2; ++i) {
+ test_single_thread(i);
+ }
+}
+
class RValue {
public:
RValue() : _value(0) {}
@@ -105,14 +117,20 @@ int add(void* meta, bthread::TaskIterator<RValue> &iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, rvalue) {
+void test_rvalue(bool use_pthread) {
int64_t result = 0;
int64_t expected_result = 0;
stopped = false;
bthread::ExecutionQueueId<RValue> queue_id;
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
- add, &result));
+ add, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
RValue v(i);
@@ -126,6 +144,12 @@ TEST_F(ExecutionQueueTest, rvalue) {
ASSERT_TRUE(stopped);
}
+TEST_F(ExecutionQueueTest, rvalue) {
+ for (int i = 0; i < 2; ++i) {
+ test_rvalue(i);
+ }
+}
+
struct PushArg {
bthread::ExecutionQueueId<LongIntTask> id {0};
butil::atomic<int64_t> total_num {0};
@@ -182,10 +206,16 @@ void* push_thread_which_addresses_execq(void *arg) {
return NULL;
}
-TEST_F(ExecutionQueueTest, performance) {
+void test_performance(bool use_pthread) {
pthread_t threads[8];
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add, &result));
@@ -207,7 +237,7 @@ TEST_F(ExecutionQueueTest, performance) {
ProfilerStop();
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(pa.expected_value.load(), result);
- LOG(INFO) << "With addressed execq, each execution_queue_execute takes "
+ LOG(INFO) << "With addressed execq, each execution_queue_execute takes "
<< pa.total_time.load() / pa.total_num.load()
<< " total_num=" << pa.total_num
<< " ns with " << ARRAY_SIZE(threads) << " threads";
@@ -233,13 +263,19 @@ TEST_F(ExecutionQueueTest, performance) {
ProfilerStop();
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(pa.expected_value.load(), result);
- LOG(INFO) << "With id explicitly, execution_queue_execute takes "
+ LOG(INFO) << "With id explicitly, execution_queue_execute takes "
<< pa.total_time.load() / pa.total_num.load()
<< " total_num=" << pa.total_num
<< " ns with " << ARRAY_SIZE(threads) << " threads";
#endif // BENCHMARK_BOTH
}
+TEST_F(ExecutionQueueTest, performance) {
+ for (int i = 0; i < 2; ++i) {
+ test_performance(i);
+ }
+}
+
volatile bool g_suspending = false;
volatile bool g_should_be_urgent = false;
int urgent_times = 0;
@@ -277,11 +313,17 @@ int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, execute_urgent) {
+void test_execute_urgent(bool use_pthread) {
g_should_be_urgent = false;
pthread_t threads[10];
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend, &result));
@@ -304,7 +346,7 @@ TEST_F(ExecutionQueueTest, execute_urgent) {
usleep(100);
}
ASSERT_EQ(0, bthread::execution_queue_execute(
- queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
+ queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
g_suspending = false;
usleep(100);
}
@@ -319,11 +361,23 @@ TEST_F(ExecutionQueueTest, execute_urgent) {
ASSERT_EQ(pa.expected_value.load(), result);
}
-TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
+TEST_F(ExecutionQueueTest, execute_urgent) {
+ for (int i = 0; i < 2; ++i) {
+ test_execute_urgent(i);
+ }
+}
+
+void test_urgent_task_is_the_last_task(bool use_pthread) {
g_should_be_urgent = false;
g_suspending = false;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend, &result));
@@ -334,11 +388,12 @@ TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
}
LOG(INFO) << "Going to push";
int64_t expected = 0;
- for (int i = 1; i < 100; ++i) {
- expected += i;
- ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
+ for (int j = 1; j < 100; ++j) {
+ expected += j;
+ ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, j));
}
- ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
+ ASSERT_EQ(0, bthread::execution_queue_execute(
+ queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
usleep(100);
g_suspending = false;
butil::atomic_thread_fence(butil::memory_order_acq_rel);
@@ -348,6 +403,11 @@ TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(expected, result);
}
+TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
+ for (int i = 0; i < 2; ++i) {
+ test_urgent_task_is_the_last_task(i);
+ }
+}
long next_task[1024];
butil::atomic<int> num_threads(0);
@@ -376,11 +436,17 @@ int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, multi_threaded_order) {
+void test_multi_threaded_order(bool use_pthread) {
memset(next_task, 0, sizeof(next_task));
long disorder_times = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
check_order, &disorder_times));
pthread_t threads[12];
@@ -395,6 +461,12 @@ TEST_F(ExecutionQueueTest, multi_threaded_order) {
ASSERT_EQ(0, disorder_times);
}
+TEST_F(ExecutionQueueTest, multi_threaded_order) {
+ for (int i = 0; i < 2; ++i) {
+ test_multi_threaded_order(i);
+ }
+}
+
int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
if (iter.is_queue_stopped()) {
return 0;
@@ -404,19 +476,31 @@ int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, in_place_task) {
+void test_in_place_task(bool use_pthread) {
pthread_t thread_id = pthread_self();
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
- check_running_thread,
+ check_running_thread,
(void*)thread_id));
ASSERT_EQ(0, bthread::execution_queue_execute(
- queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
+ queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}
+TEST_F(ExecutionQueueTest, in_place_task) {
+ for (int i = 0; i < 2; ++i) {
+ test_in_place_task(i);
+ }
+}
+
struct InPlaceTask {
bool first_task;
pthread_t thread_id;
@@ -427,8 +511,8 @@ void *run_first_tasks(void* arg) {
InPlaceTask task;
task.first_task = true;
task.thread_id = pthread_self();
- EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task,
- &bthread::TASK_OPTIONS_INPLACE));
+ EXPECT_EQ(0, bthread::execution_queue_execute(
+ queue_id, task, &bthread::TASK_OPTIONS_INPLACE));
return NULL;
}
@@ -455,12 +539,18 @@ int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>
return 0;
}
-TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
+void test_should_start_new_thread_on_more_tasks(bool use_pthread) {
bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 };
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
butil::atomic<int> futex(0);
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
- stuck_and_check_running_thread,
+ stuck_and_check_running_thread,
(void*)&futex));
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value));
@@ -471,8 +561,8 @@ TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
InPlaceTask task;
task.first_task = false;
task.thread_id = pthread_self();
- ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task,
- &bthread::TASK_OPTIONS_INPLACE));
+ ASSERT_EQ(0, bthread::execution_queue_execute(
+ queue_id, task, &bthread::TASK_OPTIONS_INPLACE));
}
futex.store(2);
bthread::futex_wake_private(&futex, 1);
@@ -480,22 +570,34 @@ TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}
+TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
+ for (int i = 0; i < 2; ++i) {
+ test_should_start_new_thread_on_more_tasks(i);
+ }
+}
+
void* inplace_push_thread(void* arg) {
bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
LOG(INFO) << "Start thread" << thread_id;
for (int i = 0; i < 100000; ++i) {
bthread::execution_queue_execute(id, ((long)thread_id << 32) | i,
- &bthread::TASK_OPTIONS_INPLACE);
+ &bthread::TASK_OPTIONS_INPLACE);
}
return NULL;
}
-TEST_F(ExecutionQueueTest, inplace_and_order) {
+void test_inplace_and_order(bool use_pthread) {
memset(next_task, 0, sizeof(next_task));
long disorder_times = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
check_order, &disorder_times));
pthread_t threads[12];
@@ -510,6 +612,12 @@ TEST_F(ExecutionQueueTest, inplace_and_order) {
ASSERT_EQ(0, disorder_times);
}
+TEST_F(ExecutionQueueTest, inplace_and_order) {
+ for (int i = 0; i < 2; ++i) {
+ test_inplace_and_order(i);
+ }
+}
+
TEST_F(ExecutionQueueTest, size_of_task_node) {
LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode);
}
@@ -535,9 +643,15 @@ int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, cancel) {
+void test_cancel(bool use_pthread) {
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend2, &result));
@@ -559,6 +673,12 @@ TEST_F(ExecutionQueueTest, cancel) {
ASSERT_EQ(0, result);
}
+TEST_F(ExecutionQueueTest, cancel) {
+ for (int i = 0; i < 2; ++i) {
+ test_cancel(i);
+ }
+}
+
struct CancelSelf {
butil::atomic<bthread::TaskHandle*> handle;
};
@@ -576,9 +696,15 @@ int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, cancel_self) {
+void test_cancel_self(bool use_pthread) {
bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
cancel_self, NULL));
CancelSelf task;
@@ -590,6 +716,12 @@ TEST_F(ExecutionQueueTest, cancel_self) {
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}
+TEST_F(ExecutionQueueTest, cancel_self) {
+ for (int i = 0; i < 2; ++i) {
+ test_cancel_self(i);
+ }
+}
+
struct AddTask {
int value;
bool cancel_task;
@@ -628,7 +760,7 @@ int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, random_cancel) {
+void test_random_cancel(bool use_pthread) {
bthread::ExecutionQueueId<AddTask> queue_id = { 0 };
AddMeta m;
m.sum = 0;
@@ -660,8 +792,8 @@ TEST_F(ExecutionQueueTest, random_cancel) {
t.cancel_task = true;
t.cancel_value = i;
t.handle = h;
- ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t,
- &bthread::TASK_OPTIONS_URGENT));
+ ASSERT_EQ(0, bthread::execution_queue_execute(
+ queue_id, t, &bthread::TASK_OPTIONS_URGENT));
} else {
// do nothing;
}
@@ -673,6 +805,12 @@ TEST_F(ExecutionQueueTest, random_cancel) {
LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times
<< " succ_times=" << m.succ_times
<< " fail_times=" << m.fail_times;
+}
+
+TEST_F(ExecutionQueueTest, random_cancel) {
+ for (int i = 0; i < 2; ++i) {
+ test_random_cancel(i);
+ }
}
@@ -685,13 +823,19 @@ int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
+void test_not_do_iterate_at_all(bool use_pthread) {
int64_t result = 0;
int64_t expected_result = 0;
bthread::ExecutionQueueId<LongIntTask> queue_id;
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
- add2, &result));
+ add2, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
@@ -702,6 +846,12 @@ TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
ASSERT_EQ(expected_result, result);
}
+TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
+ for (int i = 0; i < 2; ++i) {
+ test_not_do_iterate_at_all(i);
+ }
+}
+
int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
int64_t* result = (int64_t*)meta;
if (iter.is_queue_stopped()) {
@@ -723,10 +873,16 @@ int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
return 0;
}
-TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
+void test_cancel_unexecuted_high_priority_task(bool use_pthread) {
g_should_be_urgent = false;
bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
bthread::ExecutionQueueOptions options;
+ options.use_pthread = use_pthread;
+ if (options.use_pthread) {
+ LOG(INFO) << "================ pthread ================";
+ } else {
+ LOG(INFO) << "================ bthread ================";
+ }
int64_t result = 0;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add_with_suspend3, &result));
@@ -740,11 +896,11 @@ TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
// expecting that both operations are successful.
bthread::TaskHandle h;
ASSERT_EQ(0, bthread::execution_queue_execute(
- queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
+ queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
ASSERT_EQ(0, bthread::execution_queue_cancel(h));
-
+
// Resume executor
- g_suspending = false;
+ g_suspending = false;
// Push a normal task
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345));
@@ -755,4 +911,10 @@ TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
ASSERT_EQ(12345, result);
}
+
+TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
+ for (int i = 0; i < 2; ++i) {
+ test_cancel_unexecuted_high_priority_task(i);
+ }
+}
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org