You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/10/19 21:57:41 UTC
[1/2] mesos git commit: Introduced a ReadWriteLock in libprocess.
Repository: mesos
Updated Branches:
refs/heads/master 0a8421612 -> 5fd4b46f4
Introduced a ReadWriteLock in libprocess.
The ReadWriteLock is a lock that allows for concurrent reads and
exclusive writes. Starvation of write lock requests is preventing
by queueing read lock requests when there is a write lock request
waiting.
Review: https://reviews.apache.org/r/62911/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4509bb49
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4509bb49
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4509bb49
Branch: refs/heads/master
Commit: 4509bb49ef5e16a84336d72b5876b7d366e9864e
Parents: 0a84216
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Oct 19 14:24:21 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Oct 19 14:39:30 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/Makefile.am | 1 +
3rdparty/libprocess/include/process/rwlock.hpp | 192 ++++++++++++++++++++
2 files changed, 193 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4509bb49/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index 94c7a72..d57a610 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -59,6 +59,7 @@ nobase_include_HEADERS = \
process/queue.hpp \
process/reap.hpp \
process/run.hpp \
+ process/rwlock.hpp \
process/sequence.hpp \
process/shared.hpp \
process/socket.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/4509bb49/3rdparty/libprocess/include/process/rwlock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/rwlock.hpp b/3rdparty/libprocess/include/process/rwlock.hpp
new file mode 100644
index 0000000..b9f76f6
--- /dev/null
+++ b/3rdparty/libprocess/include/process/rwlock.hpp
@@ -0,0 +1,192 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_RWMUTEX_HPP__
+#define __PROCESS_RWMUTEX_HPP__
+
+#include <atomic>
+#include <memory>
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
+
+namespace process {
+
+/**
+ * ReadWriteLock is a lock that allows concurrent reads and
+ * exclusive writes.
+ *
+ * To prevent starvation of write lock requests, reads will
+ * queue when one or more write lock requests is waiting, even
+ * if the read lock is currently acquired.
+ */
+class ReadWriteLock
+{
+public:
+ ReadWriteLock() : data(new Data()) {}
+
+ // TODO(bmahler): Consider returning a 'Locked' object in the
+ // future as the mechanism for unlocking, rather than exposing
+ // unlocking functions for all to call.
+ Future<Nothing> write_lock()
+ {
+ Future<Nothing> future = Nothing();
+
+ synchronized (data->lock) {
+ if (!data->write_locked && data->read_locked == 0u) {
+ data->write_locked = true;
+ } else {
+ Waiter w{Waiter::WRITE};
+ future = w.promise.future();
+ data->waiters.push(std::move(w));
+ }
+ }
+
+ return future;
+ }
+
+ void write_unlock()
+ {
+ // NOTE: We need to satisfy the waiter(s) futures outside the
+ // critical section because it might trigger callbacks which
+ // try to reacquire a read or write lock.
+ std::queue<Waiter> unblocked;
+
+ synchronized (data->lock) {
+ CHECK(data->write_locked);
+ CHECK_EQ(data->read_locked, 0u);
+
+ data->write_locked = false;
+
+ if (!data->waiters.empty()) {
+ switch (data->waiters.front().type) {
+ case Waiter::READ:
+ // Dequeue the group of readers at the front.
+ while (!data->waiters.empty() &&
+ data->waiters.front().type == Waiter::READ) {
+ unblocked.push(std::move(data->waiters.front()));
+ data->waiters.pop();
+ }
+
+ data->read_locked = unblocked.size();
+
+ break;
+
+ case Waiter::WRITE:
+ unblocked.push(std::move(data->waiters.front()));
+ data->waiters.pop();
+ data->write_locked = true;
+
+ CHECK_EQ(data->read_locked, 0u);
+
+ break;
+ }
+ }
+ }
+
+ while (!unblocked.empty()) {
+ unblocked.front().promise.set(Nothing());
+ unblocked.pop();
+ }
+ }
+
+ // TODO(bmahler): Consider returning a 'Locked' object in the
+ // future as the mechanism for unlocking, rather than exposing
+ // unlocking functions for all to call.
+ Future<Nothing> read_lock()
+ {
+ Future<Nothing> future = Nothing();
+
+ synchronized (data->lock) {
+ if (!data->write_locked && data->waiters.empty()) {
+ data->read_locked++;
+ } else {
+ Waiter w{Waiter::READ};
+ future = w.promise.future();
+ data->waiters.push(std::move(w));
+ }
+ }
+
+ return future;
+ }
+
+ void read_unlock()
+ {
+ // NOTE: We need to satisfy the waiter future outside the
+ // critical section because it might trigger callbacks which
+ // try to reacquire a read or write lock.
+ Option<Waiter> waiter;
+
+ synchronized (data->lock) {
+ CHECK(!data->write_locked);
+ CHECK_GT(data->read_locked, 0u);
+
+ data->read_locked--;
+
+ if (data->read_locked == 0u && !data->waiters.empty()) {
+ CHECK_EQ(data->waiters.front().type, Waiter::WRITE);
+
+ waiter = std::move(data->waiters.front());
+ data->waiters.pop();
+ data->write_locked = true;
+ }
+ }
+
+ if (waiter.isSome()) {
+ waiter->promise.set(Nothing());
+ }
+ }
+
+private:
+ struct Waiter
+ {
+ enum { READ, WRITE} type;
+ Promise<Nothing> promise;
+ };
+
+ struct Data
+ {
+ Data() : read_locked(0), write_locked(false) {}
+
+ ~Data()
+ {
+ // TODO(zhitao): Fail promises?
+ }
+
+ // The state of the lock can be either:
+ // (1) Unlocked: an incoming read or write grabs the lock.
+ //
+ // (2) Read locked (by one or more readers): an incoming write
+ // will queue in the waiters. An incoming read will proceed
+ // if no one is waiting, otherwise it will queue.
+ //
+ // (3) Write locked: incoming reads and writes will queue.
+
+ size_t read_locked;
+ bool write_locked;
+ std::queue<Waiter> waiters;
+
+ // Rather than use a process to serialize access to the
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock = ATOMIC_FLAG_INIT;
+ };
+
+ std::shared_ptr<Data> data;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_RWMUTEX_HPP__
[2/2] mesos git commit: Added tests for ReadWriteLock.
Posted by bm...@apache.org.
Added tests for ReadWriteLock.
Review: https://reviews.apache.org/r/62912/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5fd4b46f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5fd4b46f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5fd4b46f
Branch: refs/heads/master
Commit: 5fd4b46f46feaef4ea426a1102b4efe64bc25b4e
Parents: 4509bb4
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Oct 19 14:41:25 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Oct 19 14:57:28 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
3rdparty/libprocess/src/tests/rwlock_tests.cpp | 182 ++++++++++++++++++++
3 files changed, 184 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index c5bfe6d..03a0ca8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -303,6 +303,7 @@ libprocess_tests_SOURCES = \
src/tests/profiler_tests.cpp \
src/tests/queue_tests.cpp \
src/tests/reap_tests.cpp \
+ src/tests/rwlock_tests.cpp \
src/tests/socket_tests.cpp \
src/tests/sequence_tests.cpp \
src/tests/shared_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt b/3rdparty/libprocess/src/tests/CMakeLists.txt
index d9ca23d..b01bf97 100644
--- a/3rdparty/libprocess/src/tests/CMakeLists.txt
+++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
@@ -32,6 +32,7 @@ set(PROCESS_TESTS_SRC
owned_tests.cpp
process_tests.cpp
queue_tests.cpp
+ rwlock_tests.cpp
sequence_tests.cpp
shared_tests.cpp
socket_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/5fd4b46f/3rdparty/libprocess/src/tests/rwlock_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/rwlock_tests.cpp b/3rdparty/libprocess/src/tests/rwlock_tests.cpp
new file mode 100644
index 0000000..d0d682d
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/rwlock_tests.cpp
@@ -0,0 +1,182 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <gtest/gtest.h>
+
+#include <process/future.hpp>
+#include <process/rwlock.hpp>
+
+using process::Future;
+using process::ReadWriteLock;
+
+
+TEST(ReadWriteLockTest, LockWhenUnlocked)
+{
+ ReadWriteLock lock;
+
+ // We should be able to lock for writing or reading when the
+ // lock is unlocked.
+
+ EXPECT_TRUE(lock.write_lock().isReady());
+ lock.write_unlock();
+
+ EXPECT_TRUE(lock.read_lock().isReady());
+ lock.read_unlock();
+
+ EXPECT_TRUE(lock.write_lock().isReady());
+ lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, BlockReads)
+{
+ ReadWriteLock lock;
+
+ // We should be able to acquire the write lock immediately.
+ EXPECT_TRUE(lock.write_lock().isReady());
+
+ // Subsequent reads should "block".
+ Future<Nothing> read = lock.read_lock();
+ EXPECT_TRUE(read.isPending());
+
+ // After we release the write lock,
+ // the read lock will be acquired.
+ lock.write_unlock();
+
+ EXPECT_TRUE(read.isReady());
+ lock.read_unlock();
+}
+
+
+TEST(ReadWriteLockTest, BlockWrites)
+{
+ ReadWriteLock lock;
+
+ // We should be able to acquire the write lock immediately.
+ EXPECT_TRUE(lock.write_lock().isReady());
+
+ // Subsequent writes should "block".
+ Future<Nothing> write = lock.write_lock();
+ EXPECT_TRUE(write.isPending());
+
+ // After we release the write lock,
+ // the next write lock will be acquired.
+ lock.write_unlock();
+
+ EXPECT_TRUE(write.isReady());
+ lock.write_unlock();
+
+ // The write lock should also "block" behind a read lock.
+ EXPECT_TRUE(lock.read_lock().isReady());
+
+ write = lock.write_lock();
+ EXPECT_TRUE(write.isPending());
+
+ // After we release the read lock,
+ // the next write lock will be acquired.
+ lock.read_unlock();
+
+ EXPECT_TRUE(write.isReady());
+ lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, ConcurrentReads)
+{
+ ReadWriteLock lock;
+
+ // We should be able to acquire read lock multiple times.
+ EXPECT_TRUE(lock.read_lock().isReady());
+ EXPECT_TRUE(lock.read_lock().isReady());
+ EXPECT_TRUE(lock.read_lock().isReady());
+
+ // Subsequent writes should "block" and won't be unblocked
+ // until all reads complete.
+ Future<Nothing> write = lock.write_lock();
+ EXPECT_TRUE(write.isPending());
+
+ lock.read_unlock();
+ EXPECT_TRUE(write.isPending());
+
+ lock.read_unlock();
+ EXPECT_TRUE(write.isPending());
+
+ lock.read_unlock();
+ EXPECT_TRUE(write.isReady());
+
+ lock.write_unlock();
+}
+
+
+TEST(ReadWriteLockTest, NoStarvation)
+{
+ ReadWriteLock lock;
+
+ // We should be able to acquire read lock immediately.
+ EXPECT_TRUE(lock.read_lock().isReady());
+
+ // Queue a [WRITE1, READ1, READ2, WRITE2, READ3, READ4].
+ // The reads will run in groups, but cannot cross
+ // the write boundaries (i.e. READ3 and READ4 must wait
+ // behind WRITE2).
+
+ Future<Nothing> write1 = lock.write_lock();
+ EXPECT_TRUE(write1.isPending());
+
+ Future<Nothing> read1 = lock.read_lock();
+ EXPECT_TRUE(read1.isPending());
+
+ Future<Nothing> read2 = lock.read_lock();
+ EXPECT_TRUE(read2.isPending());
+
+ Future<Nothing> write2 = lock.write_lock();
+ EXPECT_TRUE(write2.isPending());
+
+ Future<Nothing> read3 = lock.read_lock();
+ EXPECT_TRUE(read3.isPending());
+
+ Future<Nothing> read4 = lock.read_lock();
+ EXPECT_TRUE(read4.isPending());
+
+ // Release the intial read lock.
+ lock.read_unlock();
+
+ EXPECT_TRUE(write1.isReady());
+ EXPECT_TRUE(read1.isPending());
+ EXPECT_TRUE(read2.isPending());
+ EXPECT_TRUE(write2.isPending());
+ EXPECT_TRUE(read3.isPending());
+ EXPECT_TRUE(read4.isPending());
+
+ lock.write_unlock();
+
+ EXPECT_TRUE(read1.isReady());
+ EXPECT_TRUE(read2.isReady());
+ EXPECT_TRUE(write2.isPending());
+ EXPECT_TRUE(read3.isPending());
+ EXPECT_TRUE(read4.isPending());
+
+ lock.read_unlock();
+ lock.read_unlock();
+
+ EXPECT_TRUE(write2.isReady());
+ EXPECT_TRUE(read3.isPending());
+ EXPECT_TRUE(read4.isPending());
+
+ lock.write_unlock();
+
+ EXPECT_TRUE(read3.isReady());
+ EXPECT_TRUE(read4.isReady());
+
+ lock.read_unlock();
+ lock.read_unlock();
+}