You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:33 UTC

[rocketmq-client-cpp] 16/29: refactor: concurrent_queue

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit a5aa61ea98f20153131f6410c16fe4d933e7a093
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:29:02 2020 +0800

    refactor: concurrent_queue
---
 src/concurrent/concurrent_queue.hpp | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/src/concurrent/concurrent_queue.hpp b/src/concurrent/concurrent_queue.hpp
index d826f30..7fed592 100644
--- a/src/concurrent/concurrent_queue.hpp
+++ b/src/concurrent/concurrent_queue.hpp
@@ -45,7 +45,7 @@ class concurrent_queue_node {
   explicit concurrent_queue_node(E v) : value_(v) {}
 
   value_type* value_;
-  type* volatile next_;
+  std::atomic<type*> next_;
 
   friend concurrent_queue<value_type>;
 };
@@ -69,17 +69,20 @@ class concurrent_queue {
   }
 
   concurrent_queue(bool clear_when_destruct = true)
-      : sentinel((node_type*)new char[sizeof(node_type)]), _clear_when_destruct(clear_when_destruct) {
-    sentinel->next_ = sentinel;
+      : sentinel((node_type*)new char[sizeof(node_type)]), size_(0), _clear_when_destruct(clear_when_destruct) {
+    sentinel->next_.store(sentinel);
     head_ = tail_ = sentinel;
   }
 
   bool empty() { return sentinel == tail_.load(); }
 
+  size_t size() { return size_.load(); }
+
   template <class E>
   void push_back(E&& v) {
     auto* node = new node_type(std::forward<E>(v));
     push_back_impl(node);
+    size_.fetch_add(1);
   }
 
   std::unique_ptr<value_type> pop_front() {
@@ -87,6 +90,7 @@ class concurrent_queue {
     if (node == sentinel) {
       return std::unique_ptr<value_type>();
     } else {
+      size_.fetch_sub(1);
       auto val = node->value_;
       delete node;
       return std::unique_ptr<value_type>(val);
@@ -95,13 +99,13 @@ class concurrent_queue {
 
  private:
   void push_back_impl(node_type* node) noexcept {
-    node->next_ = sentinel;
+    node->next_.store(sentinel);
     auto tail = tail_.exchange(node);
     if (tail == sentinel) {
       head_.store(node);
     } else {
       // guarantee: tail is not released
-      tail->next_ = node;
+      tail->next_.store(node);
     }
   }
 
@@ -114,7 +118,7 @@ class concurrent_queue {
       }
       if (head != nullptr) {
         if (head_.compare_exchange_weak(head, nullptr)) {
-          auto next = head->next_;
+          auto next = head->next_.load();
           if (next == sentinel) {
             auto t = head;
             // only one element
@@ -126,10 +130,10 @@ class concurrent_queue {
             size_t j = 0;
             do {
               // push-pop conflict, spin
-              if (0 == j++ % 10) {
+              if (0 == ++j % 10) {
                 std::this_thread::yield();
               }
-              next = head->next_;
+              next = head->next_.load();
             } while (next == sentinel);
           }
           head_.store(next);
@@ -147,6 +151,7 @@ class concurrent_queue {
 
   std::atomic<node_type *> head_, tail_;
   node_type* const sentinel;
+  std::atomic<size_t> size_;
   bool _clear_when_destruct;
 };