You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:33 UTC
[rocketmq-client-cpp] 16/29: refactor: concurrent_queue
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit a5aa61ea98f20153131f6410c16fe4d933e7a093
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:29:02 2020 +0800
refactor: concurrent_queue
---
src/concurrent/concurrent_queue.hpp | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/src/concurrent/concurrent_queue.hpp b/src/concurrent/concurrent_queue.hpp
index d826f30..7fed592 100644
--- a/src/concurrent/concurrent_queue.hpp
+++ b/src/concurrent/concurrent_queue.hpp
@@ -45,7 +45,7 @@ class concurrent_queue_node {
explicit concurrent_queue_node(E v) : value_(v) {}
value_type* value_;
- type* volatile next_;
+ std::atomic<type*> next_;
friend concurrent_queue<value_type>;
};
@@ -69,17 +69,20 @@ class concurrent_queue {
}
concurrent_queue(bool clear_when_destruct = true)
- : sentinel((node_type*)new char[sizeof(node_type)]), _clear_when_destruct(clear_when_destruct) {
- sentinel->next_ = sentinel;
+ : sentinel((node_type*)new char[sizeof(node_type)]), size_(0), _clear_when_destruct(clear_when_destruct) {
+ sentinel->next_.store(sentinel);
head_ = tail_ = sentinel;
}
bool empty() { return sentinel == tail_.load(); }
+ size_t size() { return size_.load(); }
+
template <class E>
void push_back(E&& v) {
auto* node = new node_type(std::forward<E>(v));
push_back_impl(node);
+ size_.fetch_add(1);
}
std::unique_ptr<value_type> pop_front() {
@@ -87,6 +90,7 @@ class concurrent_queue {
if (node == sentinel) {
return std::unique_ptr<value_type>();
} else {
+ size_.fetch_sub(1);
auto val = node->value_;
delete node;
return std::unique_ptr<value_type>(val);
@@ -95,13 +99,13 @@ class concurrent_queue {
private:
void push_back_impl(node_type* node) noexcept {
- node->next_ = sentinel;
+ node->next_.store(sentinel);
auto tail = tail_.exchange(node);
if (tail == sentinel) {
head_.store(node);
} else {
// guarantee: tail is not released
- tail->next_ = node;
+ tail->next_.store(node);
}
}
@@ -114,7 +118,7 @@ class concurrent_queue {
}
if (head != nullptr) {
if (head_.compare_exchange_weak(head, nullptr)) {
- auto next = head->next_;
+ auto next = head->next_.load();
if (next == sentinel) {
auto t = head;
// only one element
@@ -126,10 +130,10 @@ class concurrent_queue {
size_t j = 0;
do {
// push-pop conflict, spin
- if (0 == j++ % 10) {
+ if (0 == ++j % 10) {
std::this_thread::yield();
}
- next = head->next_;
+ next = head->next_.load();
} while (next == sentinel);
}
head_.store(next);
@@ -147,6 +151,7 @@ class concurrent_queue {
std::atomic<node_type *> head_, tail_;
node_type* const sentinel;
+ std::atomic<size_t> size_;
bool _clear_when_destruct;
};