You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/10/19 21:57:41 UTC

[1/2] mesos git commit: Introduced a ReadWriteLock in libprocess.

Repository: mesos
Updated Branches:
  refs/heads/master 0a8421612 -> 5fd4b46f4


Introduced a ReadWriteLock in libprocess.

The ReadWriteLock is a lock that allows for concurrent reads and
exclusive writes. Starvation of write lock requests is preventing
by queueing read lock requests when there is a write lock request
waiting.

Review: https://reviews.apache.org/r/62911/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4509bb49
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4509bb49
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4509bb49

Branch: refs/heads/master
Commit: 4509bb49ef5e16a84336d72b5876b7d366e9864e
Parents: 0a84216
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Oct 19 14:24:21 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Oct 19 14:39:30 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/Makefile.am        |   1 +
 3rdparty/libprocess/include/process/rwlock.hpp | 192 ++++++++++++++++++++
 2 files changed, 193 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4509bb49/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index 94c7a72..d57a610 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -59,6 +59,7 @@ nobase_include_HEADERS =		\
   process/queue.hpp			\
   process/reap.hpp			\
   process/run.hpp			\
+  process/rwlock.hpp			\
   process/sequence.hpp			\
   process/shared.hpp			\
   process/socket.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/4509bb49/3rdparty/libprocess/include/process/rwlock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/rwlock.hpp b/3rdparty/libprocess/include/process/rwlock.hpp
new file mode 100644
index 0000000..b9f76f6
--- /dev/null
+++ b/3rdparty/libprocess/include/process/rwlock.hpp
@@ -0,0 +1,192 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_RWMUTEX_HPP__
+#define __PROCESS_RWMUTEX_HPP__
+
+#include <atomic>
+#include <memory>
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
+
+namespace process {
+
+/**
+ * ReadWriteLock is a lock that allows concurrent reads and
+ * exclusive writes.
+ *
+ * To prevent starvation of write lock requests, reads will
+ * queue when one or more write lock requests is waiting, even
+ * if the read lock is currently acquired.
+ */
+class ReadWriteLock
+{
+public:
+  ReadWriteLock() : data(new Data()) {}
+
+  // TODO(bmahler): Consider returning a 'Locked' object in the
+  // future as the mechanism for unlocking, rather than exposing
+  // unlocking functions for all to call.
+  Future<Nothing> write_lock()
+  {
+    Future<Nothing> future = Nothing();
+
+    synchronized (data->lock) {
+      if (!data->write_locked && data->read_locked == 0u) {
+        data->write_locked = true;
+      } else {
+        Waiter w{Waiter::WRITE};
+        future = w.promise.future();
+        data->waiters.push(std::move(w));
+      }
+    }
+
+    return future;
+  }
+
+  void write_unlock()
+  {
+    // NOTE: We need to satisfy the waiter(s) futures outside the
+    // critical section because it might trigger callbacks which
+    // try to reacquire a read or write lock.
+    std::queue<Waiter> unblocked;
+
+    synchronized (data->lock) {
+      CHECK(data->write_locked);
+      CHECK_EQ(data->read_locked, 0u);
+
+      data->write_locked = false;
+
+      if (!data->waiters.empty()) {
+        switch (data->waiters.front().type) {
+          case Waiter::READ:
+            // Dequeue the group of readers at the front.
+            while (!data->waiters.empty() &&
+                   data->waiters.front().type == Waiter::READ) {
+              unblocked.push(std::move(data->waiters.front()));
+              data->waiters.pop();
+            }
+
+            data->read_locked = unblocked.size();
+
+            break;
+
+          case Waiter::WRITE:
+            unblocked.push(std::move(data->waiters.front()));
+            data->waiters.pop();
+            data->write_locked = true;
+
+            CHECK_EQ(data->read_locked, 0u);
+
+            break;
+          }
+      }
+    }
+
+    while (!unblocked.empty()) {
+      unblocked.front().promise.set(Nothing());
+      unblocked.pop();
+    }
+  }
+
+  // TODO(bmahler): Consider returning a 'Locked' object in the
+  // future as the mechanism for unlocking, rather than exposing
+  // unlocking functions for all to call.
+  Future<Nothing> read_lock()
+  {
+    Future<Nothing> future = Nothing();
+
+    synchronized (data->lock) {
+      if (!data->write_locked && data->waiters.empty()) {
+        data->read_locked++;
+      } else {
+        Waiter w{Waiter::READ};
+        future = w.promise.future();
+        data->waiters.push(std::move(w));
+      }
+    }
+
+    return future;
+  }
+
+  void read_unlock()
+  {
+    // NOTE: We need to satisfy the waiter future outside the
+    // critical section because it might trigger callbacks which
+    // try to reacquire a read or write lock.
+    Option<Waiter> waiter;
+
+    synchronized (data->lock) {
+      CHECK(!data->write_locked);
+      CHECK_GT(data->read_locked, 0u);
+
+      data->read_locked--;
+
+      if (data->read_locked == 0u && !data->waiters.empty()) {
+        CHECK_EQ(data->waiters.front().type, Waiter::WRITE);
+
+        waiter = std::move(data->waiters.front());
+        data->waiters.pop();
+        data->write_locked = true;
+      }
+    }
+
+    if (waiter.isSome()) {
+      waiter->promise.set(Nothing());
+    }
+  }
+
+private:
+  struct Waiter
+  {
+    enum { READ, WRITE} type;
+    Promise<Nothing> promise;
+  };
+
+  struct Data
+  {
+    Data() : read_locked(0), write_locked(false) {}
+
+    ~Data()
+    {
+      // TODO(zhitao): Fail promises?
+    }
+
+    // The state of the lock can be either:
+    //   (1) Unlocked: an incoming read or write grabs the lock.
+    //
+    //   (2) Read locked (by one or more readers): an incoming write
+    //       will queue in the waiters. An incoming read will proceed
+    //       if no one is waiting, otherwise it will queue.
+    //
+    //   (3) Write locked: incoming reads and writes will queue.
+
+    size_t read_locked;
+    bool write_locked;
+    std::queue<Waiter> waiters;
+
+    // Rather than use a process to serialize access to the
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock = ATOMIC_FLAG_INIT;
+  };
+
+  std::shared_ptr<Data> data;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_RWMUTEX_HPP__


[2/2] mesos git commit: Added tests for ReadWriteLock.

Posted by bm...@apache.org.
Added tests for ReadWriteLock.

Review: https://reviews.apache.org/r/62912/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5fd4b46f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5fd4b46f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5fd4b46f

Branch: refs/heads/master
Commit: 5fd4b46f46feaef4ea426a1102b4efe64bc25b4e
Parents: 4509bb4
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Oct 19 14:41:25 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Oct 19 14:57:28 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                |   1 +
 3rdparty/libprocess/src/tests/CMakeLists.txt   |   1 +
 3rdparty/libprocess/src/tests/rwlock_tests.cpp | 182 ++++++++++++++++++++
 3 files changed, 184 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index c5bfe6d..03a0ca8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -303,6 +303,7 @@ libprocess_tests_SOURCES =					\
   src/tests/profiler_tests.cpp					\
   src/tests/queue_tests.cpp					\
   src/tests/reap_tests.cpp					\
+  src/tests/rwlock_tests.cpp					\
   src/tests/socket_tests.cpp					\
   src/tests/sequence_tests.cpp					\
   src/tests/shared_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index d9ca23d..b01bf97 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -32,6 +32,7 @@ set(PROCESS_TESTS_SRC
   owned_tests.cpp
   process_tests.cpp
   queue_tests.cpp
+  rwlock_tests.cpp
   sequence_tests.cpp
   shared_tests.cpp
   socket_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/src/tests/rwlock_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/rwlock_tests.cpp b/3rdparty/libprocess/src/tests/rwlock_tests.cpp
new file mode 100644
index 0000000..d0d682d
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/rwlock_tests.cpp
@@ -0,0 +1,182 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <gtest/gtest.h>
+
+#include <process/future.hpp>
+#include <process/rwlock.hpp>
+
+using process::Future;
+using process::ReadWriteLock;
+
+
+TEST(ReadWriteLockTest, LockWhenUnlocked)
+{
+  ReadWriteLock lock;
+
+  // We should be able to lock for writing or reading when the
+  // lock is unlocked.
+
+  EXPECT_TRUE(lock.write_lock().isReady());
+  lock.write_unlock();
+
+  EXPECT_TRUE(lock.read_lock().isReady());
+  lock.read_unlock();
+
+  EXPECT_TRUE(lock.write_lock().isReady());
+  lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, BlockReads)
+{
+  ReadWriteLock lock;
+
+  // We should be able to acquire the write lock immediately.
+  EXPECT_TRUE(lock.write_lock().isReady());
+
+  // Subsequent reads should "block".
+  Future<Nothing> read = lock.read_lock();
+  EXPECT_TRUE(read.isPending());
+
+  // After we release the write lock,
+  // the read lock will be acquired.
+  lock.write_unlock();
+
+  EXPECT_TRUE(read.isReady());
+  lock.read_unlock();
+}
+
+
+TEST(ReadWriteLockTest, BlockWrites)
+{
+  ReadWriteLock lock;
+
+  // We should be able to acquire the write lock immediately.
+  EXPECT_TRUE(lock.write_lock().isReady());
+
+  // Subsequent writes should "block".
+  Future<Nothing> write = lock.write_lock();
+  EXPECT_TRUE(write.isPending());
+
+  // After we release the write lock,
+  // the next write lock will be acquired.
+  lock.write_unlock();
+
+  EXPECT_TRUE(write.isReady());
+  lock.write_unlock();
+
+  // The write lock should also "block" behind a read lock.
+  EXPECT_TRUE(lock.read_lock().isReady());
+
+  write = lock.write_lock();
+  EXPECT_TRUE(write.isPending());
+
+  // After we release the read lock,
+  // the next write lock will be acquired.
+  lock.read_unlock();
+
+  EXPECT_TRUE(write.isReady());
+  lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, ConcurrentReads)
+{
+  ReadWriteLock lock;
+
+  // We should be able to acquire read lock multiple times.
+  EXPECT_TRUE(lock.read_lock().isReady());
+  EXPECT_TRUE(lock.read_lock().isReady());
+  EXPECT_TRUE(lock.read_lock().isReady());
+
+  // Subsequent writes should "block" and won't be unblocked
+  // until all reads complete.
+  Future<Nothing> write = lock.write_lock();
+  EXPECT_TRUE(write.isPending());
+
+  lock.read_unlock();
+  EXPECT_TRUE(write.isPending());
+
+  lock.read_unlock();
+  EXPECT_TRUE(write.isPending());
+
+  lock.read_unlock();
+  EXPECT_TRUE(write.isReady());
+
+  lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, NoStarvation)
+{
+  ReadWriteLock lock;
+
+  // We should be able to acquire read lock immediately.
+  EXPECT_TRUE(lock.read_lock().isReady());
+
+  // Queue a [WRITE1, READ1, READ2, WRITE2, READ3, READ4].
+  // The reads will run in groups, but cannot cross
+  // the write boundaries (i.e. READ3 and READ4 must wait
+  // behind WRITE2).
+
+  Future<Nothing> write1 = lock.write_lock();
+  EXPECT_TRUE(write1.isPending());
+
+  Future<Nothing> read1 = lock.read_lock();
+  EXPECT_TRUE(read1.isPending());
+
+  Future<Nothing> read2 = lock.read_lock();
+  EXPECT_TRUE(read2.isPending());
+
+  Future<Nothing> write2 = lock.write_lock();
+  EXPECT_TRUE(write2.isPending());
+
+  Future<Nothing> read3 = lock.read_lock();
+  EXPECT_TRUE(read3.isPending());
+
+  Future<Nothing> read4 = lock.read_lock();
+  EXPECT_TRUE(read4.isPending());
+
+  // Release the intial read lock.
+  lock.read_unlock();
+
+  EXPECT_TRUE(write1.isReady());
+  EXPECT_TRUE(read1.isPending());
+  EXPECT_TRUE(read2.isPending());
+  EXPECT_TRUE(write2.isPending());
+  EXPECT_TRUE(read3.isPending());
+  EXPECT_TRUE(read4.isPending());
+
+  lock.write_unlock();
+
+  EXPECT_TRUE(read1.isReady());
+  EXPECT_TRUE(read2.isReady());
+  EXPECT_TRUE(write2.isPending());
+  EXPECT_TRUE(read3.isPending());
+  EXPECT_TRUE(read4.isPending());
+
+  lock.read_unlock();
+  lock.read_unlock();
+
+  EXPECT_TRUE(write2.isReady());
+  EXPECT_TRUE(read3.isPending());
+  EXPECT_TRUE(read4.isPending());
+
+  lock.write_unlock();
+
+  EXPECT_TRUE(read3.isReady());
+  EXPECT_TRUE(read4.isReady());
+
+  lock.read_unlock();
+  lock.read_unlock();
+}