You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by GitBox <gi...@apache.org> on 2021/05/10 05:22:36 UTC

[GitHub] [incubator-brpc] JimChengLin edited a comment on pull request #1031: add support for rwlock

JimChengLin edited a comment on pull request #1031:
URL: https://github.com/apache/incubator-brpc/pull/1031#issuecomment-836190106


   我的实现是这样的,由于 bthread 的 butex 没有按 tag 唤醒,有可能产生读写者同时唤醒(惊群)
   
   ```c++
   #pragma once
   
   #include <bthread/bthread.h>
   #include <bthread/butex.h>
   #include <butil/atomicops.h>
   
   #include <cassert>
   
   namespace bcache {
   
   class SingleWriterBthreadRWLock {
    public:
       SingleWriterBthreadRWLock() : flag_(bthread::butex_create_checked<unsigned int>()) { *flag_ = 0; }
   
       ~SingleWriterBthreadRWLock() { bthread::butex_destroy(flag_); }
   
       void lock_shared() { CHECK_EQ(0, ReadLock()); }
   
       void unlock_shared() { ReadUnlock(); }
   
       void lock() { CHECK_EQ(0, WriteLock()); }
   
       void unlock() { WriteUnlock(); }
   
    private:
       int ReadLock() {
           auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
           unsigned int state;
       wait_stage:
           while ((state = f.load(std::memory_order_relaxed)) & kLockMask) {
               unsigned int after = state | kWaitMask;
               while (!(state & kWaitMask) && !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
                                                                       std::memory_order_relaxed)) {
                   if (!(state & kLockMask)) {
                       goto lock_stage;
                   }
                   after = state | kWaitMask;
               }
               if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
                   return errno;
               }
           }
       lock_stage:
           if (f.fetch_add(1, std::memory_order_acquire) & kLockMask) {
               state = f.fetch_sub(1, std::memory_order_relaxed);
               if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
                   state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
                   if (state & kWaitMask) {
                       bthread::butex_wake_all(flag_);
                   }
               }
               if (state & kLockMask) {
                   goto wait_stage;
               } else {
                   goto lock_stage;
               }
           }
           return 0;
       }
   
       void ReadUnlock() {
           auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
           unsigned int state = f.fetch_sub(1, std::memory_order_release);
           if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
               assert(state & kLockMask);
               state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
               if (state & kWaitMask) {
                   bthread::butex_wake_all(flag_);
               }
           }
       }
   
       int WriteLock() {
           auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
           unsigned int state;
           if ((state = f.fetch_or(kLockMask, std::memory_order_acquire))) {
               assert(!(state & (kLockMask | kWaitMask)));
               state |= kLockMask;
               do {
                   assert(state & kLockMask);
                   unsigned int after = state | kWaitMask;
                   while (!(state & kWaitMask) &&
                          !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
                                                   std::memory_order_acquire)) {
                       assert(state & kLockMask);
                       if (!(state & kReaderCountMask)) {
                           return 0;
                       }
                       after = state | kWaitMask;
                   }
                   if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK &&
                       errno != EINTR) {
                       WriteUnlock();
                       return errno;
                   }
               } while ((state = f.load(std::memory_order_acquire)) & kReaderCountMask);
           }
           return 0;
       }
   
       void WriteUnlock() {
           auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
           unsigned int state = f.fetch_and(~(kLockMask | kWaitMask), std::memory_order_release);
           assert(state & kLockMask);
           if (state & kWaitMask) {
               bthread::butex_wake_all(flag_);
           }
       }
   
    private:
       static constexpr unsigned int kLockMask = 1U << 31U;
       static constexpr unsigned int kWaitMask = 1U << 30U;
       static constexpr unsigned int kReaderCountMask = 1U << 30U - 1U;
   
       unsigned int* const flag_;
   };
   
   class GeneralBthreadRWLock {
    public:
       void lock_shared() { internal_rwlock_.lock_shared(); }
   
       void unlock_shared() { internal_rwlock_.unlock_shared(); }
   
       void lock() {
           writer_mtx_.lock();
           internal_rwlock_.lock();
       }
   
       void unlock() {
           internal_rwlock_.unlock();
           writer_mtx_.unlock();
       }
   
    private:
       bthread::Mutex writer_mtx_;
       SingleWriterBthreadRWLock internal_rwlock_;
   };
   
   }  // namespace bcache
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org