You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/10/13 02:21:58 UTC

[brpc] branch master updated: Support pthread mode for ExecutionQueue (#2333)

This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new a5572936 Support pthread mode for ExecutionQueue (#2333)
a5572936 is described below

commit a55729366cd17e10a153de956a1b546c73f1f2c3
Author: Bright Chen <ch...@foxmail.com>
AuthorDate: Fri Oct 13 10:21:49 2023 +0800

    Support pthread mode for ExecutionQueue (#2333)
---
 docs/cn/execution_queue.md                |  16 ++
 src/bthread/execution_queue.cpp           |  71 +++++++--
 src/bthread/execution_queue.h             |  15 +-
 src/bthread/execution_queue_inl.h         |  19 ++-
 test/bthread_execution_queue_unittest.cpp | 236 +++++++++++++++++++++++++-----
 5 files changed, 299 insertions(+), 58 deletions(-)

diff --git a/docs/cn/execution_queue.md b/docs/cn/execution_queue.md
index 5fb9d412..8640a4af 100644
--- a/docs/cn/execution_queue.md
+++ b/docs/cn/execution_queue.md
@@ -78,6 +78,22 @@ class TaskIterator;
 ### 启动一个ExecutionQueue:
 
 ```
+struct ExecutionQueueOptions {
+    ExecutionQueueOptions();
+
+    // Execute in resident pthread instead of bthread. default: false.
+    bool use_pthread;
+
+    // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL
+    // Bthread will be used when executor = NULL and use_pthread == false.
+    bthread_attr_t bthread_attr;
+
+    // Executor that tasks run on. default: NULL
+    // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
+    // Executor is in-place(synchronous).
+    Executor * executor;
+};
+
 // Start a ExecutionQueue. If |options| is NULL, the queue will be created with
 // default options.
 // Returns 0 on success, errno otherwise
diff --git a/src/bthread/execution_queue.cpp b/src/bthread/execution_queue.cpp
index f7fd9eae..557669ee 100644
--- a/src/bthread/execution_queue.cpp
+++ b/src/bthread/execution_queue.cpp
@@ -24,6 +24,7 @@
 #include "butil/memory/singleton_on_pthread_once.h"
 #include "butil/object_pool.h"           // butil::get_object
 #include "butil/resource_pool.h"         // butil::get_resource
+#include "butil/threading/platform_thread.h"
 
 namespace bthread {
 
@@ -105,16 +106,34 @@ void ExecutionQueueBase::start_execute(TaskNode* node) {
     }
 
     if (nullptr == _options.executor) {
-        bthread_t tid;
-        // We start the execution thread in background instead of foreground as
-        // we can't determine whether the code after execute() is urgent (like
-        // unlock a pthread_mutex_t) in which case implicit context switch may
-        // cause undefined behavior (e.g. deadlock)
-        if (bthread_start_background(&tid, &_options.bthread_attr,
-                                     _execute_tasks, node) != 0) {
-            PLOG(FATAL) << "Fail to start bthread";
-            _execute_tasks(node);
+        if (_options.use_pthread) {
+            if (_pthread_started) {
+                BAIDU_SCOPED_LOCK(_mutex);
+                _current_head = node;
+                _cond.Signal();
+            } else {
+                // Start the execution bthread in background once.
+                if (pthread_create(&_pid, NULL,
+                                   _execute_tasks_pthread,
+                                   node) != 0) {
+                    PLOG(FATAL) << "Fail to create pthread";
+                    _execute_tasks(node);
+                }
+                _pthread_started = true;
+            }
+        } else {
+            bthread_t tid;
+            // We start the execution bthread in background instead of foreground as
+            // we can't determine whether the code after execute() is urgent (like
+            // unlock a pthread_mutex_t) in which case implicit context switch may
+            // cause undefined behavior (e.g. deadlock)
+            if (bthread_start_background(&tid, &_options.bthread_attr,
+                _execute_tasks, node) != 0) {
+                PLOG(FATAL) << "Fail to start bthread";
+                _execute_tasks(node);
+            }
         }
+
     } else {
         if (_options.executor->submit(_execute_tasks, node) != 0) {
             PLOG(FATAL) << "Fail to submit task";
@@ -176,10 +195,10 @@ void* ExecutionQueueBase::_execute_tasks(void* arg) {
         CHECK(m->_stopped);
         // Add _join_butex by 2 to make it equal to the next version of the
         // ExecutionQueue from the same slot so that join with old id would
-        // return immediatly.
+        // return immediately.
         // 
-        // 1: release fence to make join sees the newst changes when it sees
-        //    the newst _join_butex
+        // 1: release fence to make join sees the newest changes when it sees
+        //    the newest _join_butex
         m->_join_butex->fetch_add(2, butil::memory_order_release/*1*/);
         butex_wake_all(m->_join_butex);
         vars->execq_count << -1;
@@ -189,6 +208,28 @@ void* ExecutionQueueBase::_execute_tasks(void* arg) {
     return NULL;
 }
 
+void* ExecutionQueueBase::_execute_tasks_pthread(void* arg) {
+    butil::PlatformThread::SetName("ExecutionQueue");
+    auto head = (TaskNode*)arg;
+    auto m = (ExecutionQueueBase*)head->q;
+    m->_current_head = head;
+    while (true) {
+        BAIDU_SCOPED_LOCK(m->_mutex);
+        while (!m->_current_head) {
+            m->_cond.Wait();
+        }
+        _execute_tasks(m->_current_head);
+        m->_current_head = NULL;
+
+        int expected = _version_of_id(m->_this_id);
+        if (expected != m->_join_butex->load(butil::memory_order_relaxed)) {
+            // Execute queue has been stopped and stopped task has been executed, quit.
+            break;
+        }
+    }
+    return NULL;
+}
+
 void ExecutionQueueBase::return_task_node(TaskNode* node) {
     node->clear_before_return(_clear_func);
     butil::return_object<TaskNode>(node);
@@ -227,6 +268,10 @@ int ExecutionQueueBase::join(uint64_t id) {
             return errno;
         }
     }
+    // Join pthread if it's started.
+    if (m->_options.use_pthread && m->_pthread_started) {
+        pthread_join(m->_pid, NULL);
+    }
     return 0;
 }
 
@@ -365,6 +410,8 @@ int ExecutionQueueBase::create(uint64_t* id, const ExecutionQueueOptions* option
                 _version_of_vref(m->_versioned_ref.fetch_add(
                                     1, butil::memory_order_release)), slot);
         *id = m->_this_id;
+        m->_pthread_started = false;
+        m->_current_head = NULL;
         get_execq_vars()->execq_count << 1;
         return 0;
     }
diff --git a/src/bthread/execution_queue.h b/src/bthread/execution_queue.h
index 7fc46be5..5ceef89f 100644
--- a/src/bthread/execution_queue.h
+++ b/src/bthread/execution_queue.h
@@ -88,11 +88,11 @@ private:
 // }
 template <typename T>
 class TaskIterator : public TaskIteratorBase {
-    TaskIterator();
 public:
     typedef T*          pointer;
     typedef T&          reference;
 
+    TaskIterator() = delete;
     reference operator*() const;
     pointer operator->() const { return &(operator*()); }
     TaskIterator& operator++();
@@ -133,7 +133,7 @@ const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);
 
 class Executor {
 public:
-    virtual ~Executor() {}
+    virtual ~Executor() = default;
 
     // Return 0 on success.
     virtual int submit(void * (*fn)(void*), void* args) = 0;
@@ -141,11 +141,15 @@ public:
 
 struct ExecutionQueueOptions {
     ExecutionQueueOptions();
-    // Attribute of the bthread which execute runs on
-    // default: BTHREAD_ATTR_NORMAL
+
+    // Execute in resident pthread instead of bthread. default: false.
+    bool use_pthread;
+
+    // Attribute of the bthread which execute runs on. default: BTHREAD_ATTR_NORMAL
+    // Bthread will be used when executor = NULL and use_pthread == false.
     bthread_attr_t bthread_attr;
 
-    // Executor that tasks run on. bthread will be used when executor = NULL.
+    // Executor that tasks run on. default: NULL
     // Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
     // Executor is in-place(synchronous).
     Executor * executor;
@@ -198,7 +202,6 @@ int execution_queue_execute(ExecutionQueueId<T> id,
                             const TaskOptions* options,
                             TaskHandle* handle);
 
-
 template <typename T>
 int execution_queue_execute(ExecutionQueueId<T> id,
                             T&& task);
diff --git a/src/bthread/execution_queue_inl.h b/src/bthread/execution_queue_inl.h
index b932cbc9..64725051 100644
--- a/src/bthread/execution_queue_inl.h
+++ b/src/bthread/execution_queue_inl.h
@@ -29,6 +29,7 @@
 #include "butil/time.h"                  // butil::cpuwide_time_ns
 #include "bvar/bvar.h"                  // bvar::Adder
 #include "bthread/butex.h"              // butex_construct
+#include "butil/synchronization/condition_variable.h"
 
 namespace bthread {
 
@@ -168,7 +169,9 @@ public:
         : _head(NULL)
         , _versioned_ref(0)  // join() depends on even version
         , _high_priority_tasks(0)
-    {
+        , _pthread_started(false)
+        , _cond(&_mutex)
+        , _current_head(NULL) {
         _join_butex = butex_create_checked<butil::atomic<int> >();
         _join_butex->store(0, butil::memory_order_relaxed);
     }
@@ -203,6 +206,7 @@ private:
     void _on_recycle();
     int _execute(TaskNode* head, bool high_priority, int* niterated);
     static void* _execute_tasks(void* arg);
+    static void* _execute_tasks_pthread(void* arg);
 
     static inline uint32_t _version_of_id(uint64_t id) WARN_UNUSED_RESULT {
         return (uint32_t)(id >> 32);
@@ -234,6 +238,13 @@ private:
     clear_task_mem _clear_func;
     ExecutionQueueOptions _options;
     butil::atomic<int>* _join_butex;
+
+    // For pthread mode.
+    pthread_t _pid;
+    bool _pthread_started;
+    butil::Mutex _mutex;
+    butil::ConditionVariable _cond;
+    TaskNode* _current_head; // Current task head of each execution.
 };
 
 template <typename T>
@@ -330,12 +341,14 @@ public:
 };
 
 inline ExecutionQueueOptions::ExecutionQueueOptions()
-    : bthread_attr(BTHREAD_ATTR_NORMAL), executor(NULL)
+    : use_pthread(false)
+    , bthread_attr(BTHREAD_ATTR_NORMAL)
+    , executor(NULL)
 {}
 
 template <typename T>
 inline int execution_queue_start(
-        ExecutionQueueId<T>* id, 
+        ExecutionQueueId<T>* id,
         const ExecutionQueueOptions* options,
         int (*execute)(void* meta, TaskIterator<T>&),
         void* meta) {
diff --git a/test/bthread_execution_queue_unittest.cpp b/test/bthread_execution_queue_unittest.cpp
index 627d27c8..6ecad01b 100644
--- a/test/bthread_execution_queue_unittest.cpp
+++ b/test/bthread_execution_queue_unittest.cpp
@@ -55,14 +55,20 @@ int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, single_thread) {
+void test_single_thread(bool use_pthread) {
     int64_t result = 0;
     int64_t expected_result = 0;
     stopped = false;
     bthread::ExecutionQueueId<LongIntTask> queue_id;
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
-                                                    add, &result));
+                                                add, &result));
     for (int i = 0; i < 100; ++i) {
         expected_result += i;
         ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
@@ -75,6 +81,12 @@ TEST_F(ExecutionQueueTest, single_thread) {
     ASSERT_TRUE(stopped);
 }
 
+TEST_F(ExecutionQueueTest, single_thread) {
+    for (int i = 0; i < 2; ++i) {
+        test_single_thread(i);
+    }
+}
+
 class RValue {
 public:
     RValue() : _value(0) {}
@@ -105,14 +117,20 @@ int add(void* meta, bthread::TaskIterator<RValue> &iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, rvalue) {
+void test_rvalue(bool use_pthread) {
     int64_t result = 0;
     int64_t expected_result = 0;
     stopped = false;
     bthread::ExecutionQueueId<RValue> queue_id;
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
-                                                    add, &result));
+                                                add, &result));
     for (int i = 0; i < 100; ++i) {
         expected_result += i;
         RValue v(i);
@@ -126,6 +144,12 @@ TEST_F(ExecutionQueueTest, rvalue) {
     ASSERT_TRUE(stopped);
 }
 
+TEST_F(ExecutionQueueTest, rvalue) {
+    for (int i = 0; i < 2; ++i) {
+        test_rvalue(i);
+    }
+}
+
 struct PushArg {
     bthread::ExecutionQueueId<LongIntTask> id {0};
     butil::atomic<int64_t> total_num {0};
@@ -182,10 +206,16 @@ void* push_thread_which_addresses_execq(void *arg) {
     return NULL;
 }
 
-TEST_F(ExecutionQueueTest, performance) {
+void test_performance(bool use_pthread) {
     pthread_t threads[8];
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     int64_t result = 0;
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 add, &result));
@@ -207,7 +237,7 @@ TEST_F(ExecutionQueueTest, performance) {
     ProfilerStop();
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
     ASSERT_EQ(pa.expected_value.load(), result);
-    LOG(INFO) << "With addressed execq, each execution_queue_execute takes " 
+    LOG(INFO) << "With addressed execq, each execution_queue_execute takes "
               << pa.total_time.load() / pa.total_num.load()
               << " total_num=" << pa.total_num
               << " ns with " << ARRAY_SIZE(threads) << " threads";
@@ -233,13 +263,19 @@ TEST_F(ExecutionQueueTest, performance) {
     ProfilerStop();
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
     ASSERT_EQ(pa.expected_value.load(), result);
-    LOG(INFO) << "With id explicitly, execution_queue_execute takes " 
+    LOG(INFO) << "With id explicitly, execution_queue_execute takes "
               << pa.total_time.load() / pa.total_num.load()
               << " total_num=" << pa.total_num
               << " ns with " << ARRAY_SIZE(threads) << " threads";
 #endif  // BENCHMARK_BOTH
 }
 
+TEST_F(ExecutionQueueTest, performance) {
+    for (int i = 0; i < 2; ++i) {
+        test_performance(i);
+    }
+}
+
 volatile bool g_suspending = false;
 volatile bool g_should_be_urgent = false;
 int urgent_times = 0;
@@ -277,11 +313,17 @@ int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, execute_urgent) {
+void test_execute_urgent(bool use_pthread) {
     g_should_be_urgent = false;
     pthread_t threads[10];
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     int64_t result = 0;
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 add_with_suspend, &result));
@@ -304,7 +346,7 @@ TEST_F(ExecutionQueueTest, execute_urgent) {
             usleep(100);
         }
         ASSERT_EQ(0, bthread::execution_queue_execute(
-                      queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
+            queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
         g_suspending = false;
         usleep(100);
     }
@@ -319,11 +361,23 @@ TEST_F(ExecutionQueueTest, execute_urgent) {
     ASSERT_EQ(pa.expected_value.load(), result);
 }
 
-TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
+TEST_F(ExecutionQueueTest, execute_urgent) {
+    for (int i = 0; i < 2; ++i) {
+        test_execute_urgent(i);
+    }
+}
+
+void test_urgent_task_is_the_last_task(bool use_pthread) {
     g_should_be_urgent = false;
     g_suspending = false;
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     int64_t result = 0;
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 add_with_suspend, &result));
@@ -334,11 +388,12 @@ TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
     }
     LOG(INFO) << "Going to push";
     int64_t expected = 0;
-    for (int i = 1; i < 100; ++i) {
-        expected += i;
-        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
+    for (int j = 1; j < 100; ++j) {
+        expected += j;
+        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, j));
     }
-    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
+    ASSERT_EQ(0, bthread::execution_queue_execute(
+        queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
     usleep(100);
     g_suspending = false;
     butil::atomic_thread_fence(butil::memory_order_acq_rel);
@@ -348,6 +403,11 @@ TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
     ASSERT_EQ(expected, result);
 }
+TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
+    for (int i = 0; i < 2; ++i) {
+        test_urgent_task_is_the_last_task(i);
+    }
+}
 
 long next_task[1024];
 butil::atomic<int> num_threads(0);
@@ -376,11 +436,17 @@ int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, multi_threaded_order) {
+void test_multi_threaded_order(bool use_pthread) {
     memset(next_task, 0, sizeof(next_task));
     long disorder_times = 0;
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 check_order, &disorder_times));
     pthread_t threads[12];
@@ -395,6 +461,12 @@ TEST_F(ExecutionQueueTest, multi_threaded_order) {
     ASSERT_EQ(0, disorder_times);
 }
 
+TEST_F(ExecutionQueueTest, multi_threaded_order) {
+    for (int i = 0; i < 2; ++i) {
+        test_multi_threaded_order(i);
+    }
+}
+
 int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
     if (iter.is_queue_stopped()) {
         return 0;
@@ -404,19 +476,31 @@ int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, in_place_task) {
+void test_in_place_task(bool use_pthread) {
     pthread_t thread_id = pthread_self();
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
-                                                check_running_thread, 
+                                                check_running_thread,
                                                 (void*)thread_id));
     ASSERT_EQ(0, bthread::execution_queue_execute(
-                queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
+        queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
     ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
 }
 
+TEST_F(ExecutionQueueTest, in_place_task) {
+    for (int i = 0; i < 2; ++i) {
+        test_in_place_task(i);
+    }
+}
+
 struct InPlaceTask {
     bool first_task;
     pthread_t thread_id;
@@ -427,8 +511,8 @@ void *run_first_tasks(void* arg) {
     InPlaceTask task;
     task.first_task = true;
     task.thread_id = pthread_self();
-    EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
-                                                  &bthread::TASK_OPTIONS_INPLACE));
+    EXPECT_EQ(0, bthread::execution_queue_execute(
+        queue_id, task, &bthread::TASK_OPTIONS_INPLACE));
     return NULL;
 }
 
@@ -455,12 +539,18 @@ int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
+void test_should_start_new_thread_on_more_tasks(bool use_pthread) {
     bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 };
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     butil::atomic<int> futex(0);
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
-                                                stuck_and_check_running_thread, 
+                                                stuck_and_check_running_thread,
                                                 (void*)&futex));
     pthread_t thread;
     ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value));
@@ -471,8 +561,8 @@ TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
         InPlaceTask task;
         task.first_task = false;
         task.thread_id = pthread_self();
-        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
-                                                      &bthread::TASK_OPTIONS_INPLACE));
+        ASSERT_EQ(0, bthread::execution_queue_execute(
+            queue_id, task, &bthread::TASK_OPTIONS_INPLACE));
     }
     futex.store(2);
     bthread::futex_wake_private(&futex, 1);
@@ -480,22 +570,34 @@ TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
 }
 
+TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
+    for (int i = 0; i < 2; ++i) {
+        test_should_start_new_thread_on_more_tasks(i);
+    }
+}
+
 void* inplace_push_thread(void* arg) {
     bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
     int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
     LOG(INFO) << "Start thread" << thread_id;
     for (int i = 0; i < 100000; ++i) {
         bthread::execution_queue_execute(id, ((long)thread_id << 32) | i,
-                            &bthread::TASK_OPTIONS_INPLACE);
+                                         &bthread::TASK_OPTIONS_INPLACE);
     }
     return NULL;
 }
 
-TEST_F(ExecutionQueueTest, inplace_and_order) {
+void test_inplace_and_order(bool use_pthread) {
     memset(next_task, 0, sizeof(next_task));
     long disorder_times = 0;
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 check_order, &disorder_times));
     pthread_t threads[12];
@@ -510,6 +612,12 @@ TEST_F(ExecutionQueueTest, inplace_and_order) {
     ASSERT_EQ(0, disorder_times);
 }
 
+TEST_F(ExecutionQueueTest, inplace_and_order) {
+    for (int i = 0; i < 2; ++i) {
+        test_inplace_and_order(i);
+    }
+}
+
 TEST_F(ExecutionQueueTest, size_of_task_node) {
     LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode);
 }
@@ -535,9 +643,15 @@ int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, cancel) {
+void test_cancel(bool use_pthread) {
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     int64_t result = 0;
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 add_with_suspend2, &result));
@@ -559,6 +673,12 @@ TEST_F(ExecutionQueueTest, cancel) {
     ASSERT_EQ(0, result);
 }
 
+TEST_F(ExecutionQueueTest, cancel) {
+    for (int i = 0; i < 2; ++i) {
+        test_cancel(i);
+    }
+}
+
 struct CancelSelf {
     butil::atomic<bthread::TaskHandle*> handle;
 };
@@ -576,9 +696,15 @@ int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, cancel_self) {
+void test_cancel_self(bool use_pthread) {
     bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 cancel_self, NULL));
     CancelSelf task;
@@ -590,6 +716,12 @@ TEST_F(ExecutionQueueTest, cancel_self) {
     ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
 }
 
+TEST_F(ExecutionQueueTest, cancel_self) {
+    for (int i = 0; i < 2; ++i) {
+        test_cancel_self(i);
+    }
+}
+
 struct AddTask {
     int value;
     bool cancel_task;
@@ -628,7 +760,7 @@ int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, random_cancel) {
+void test_random_cancel(bool use_pthread) {
     bthread::ExecutionQueueId<AddTask> queue_id = { 0 };
     AddMeta m;
     m.sum = 0;
@@ -660,8 +792,8 @@ TEST_F(ExecutionQueueTest, random_cancel) {
             t.cancel_task = true;
             t.cancel_value = i;
             t.handle = h;
-            ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, 
-                                    &bthread::TASK_OPTIONS_URGENT));
+            ASSERT_EQ(0, bthread::execution_queue_execute(
+                queue_id, t, &bthread::TASK_OPTIONS_URGENT));
         } else {
             // do nothing;
         }
@@ -673,6 +805,12 @@ TEST_F(ExecutionQueueTest, random_cancel) {
     LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times
               << " succ_times=" << m.succ_times
               << " fail_times=" << m.fail_times;
+}
+
+TEST_F(ExecutionQueueTest, random_cancel) {
+    for (int i = 0; i < 2; ++i) {
+        test_random_cancel(i);
+    }
 
 }
 
@@ -685,13 +823,19 @@ int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
+void test_not_do_iterate_at_all(bool use_pthread) {
     int64_t result = 0;
     int64_t expected_result = 0;
     bthread::ExecutionQueueId<LongIntTask> queue_id;
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
-                                                    add2, &result));
+                                                add2, &result));
     for (int i = 0; i < 100; ++i) {
         expected_result += i;
         ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
@@ -702,6 +846,12 @@ TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
     ASSERT_EQ(expected_result, result);
 }
 
+TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
+    for (int i = 0; i < 2; ++i) {
+        test_not_do_iterate_at_all(i);
+    }
+}
+
 int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
     int64_t* result = (int64_t*)meta;
     if (iter.is_queue_stopped()) {
@@ -723,10 +873,16 @@ int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
     return 0;
 }
 
-TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
+void test_cancel_unexecuted_high_priority_task(bool use_pthread) {
     g_should_be_urgent = false;
     bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
     bthread::ExecutionQueueOptions options;
+    options.use_pthread = use_pthread;
+    if (options.use_pthread) {
+        LOG(INFO) << "================ pthread ================";
+    } else {
+        LOG(INFO) << "================ bthread ================";
+    }
     int64_t result = 0;
     ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                 add_with_suspend3, &result));
@@ -740,11 +896,11 @@ TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
     // expecting that both operations are successful.
     bthread::TaskHandle h;
     ASSERT_EQ(0, bthread::execution_queue_execute(
-                        queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
+        queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
     ASSERT_EQ(0, bthread::execution_queue_cancel(h));
-    
+
     // Resume executor
-    g_suspending = false;  
+    g_suspending = false;
 
     // Push a normal task
     ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345));
@@ -755,4 +911,10 @@ TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
 
     ASSERT_EQ(12345, result);
 }
+
+TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
+    for (int i = 0; i < 2; ++i) {
+        test_cancel_unexecuted_high_priority_task(i);
+    }
+}
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org