You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:30 UTC
[9/9] mesos git commit: Refactor http to use synchronized.
Refactor http to use synchronized.
Review: https://reviews.apache.org/r/32364
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94c0f68d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94c0f68d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94c0f68d
Branch: refs/heads/master
Commit: 94c0f68dc8a32b6cd768c18d6d8d65d40833296f
Parents: 8c2d8d4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:28 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:29 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 11 +++++++----
3rdparty/libprocess/src/http.cpp | 22 ++++++----------------
2 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index bba62b3..51a00f5 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -3,6 +3,7 @@
#include <stdint.h>
+#include <atomic>
#include <iosfwd>
#include <memory>
#include <queue>
@@ -191,12 +192,14 @@ public:
private:
struct Data
{
- Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
+ Data()
+ : lock(ATOMIC_FLAG_INIT),
+ readEnd(Reader::OPEN),
+ writeEnd(Writer::OPEN) {}
// Rather than use a process to serialize access to the pipe's
- // internal data we use a low-level "lock" which we acquire and
- // release using atomic builtins.
- int lock;
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock;
Reader::State readEnd;
Writer::State writeEnd;
http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1d318b9..0898335 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -18,7 +18,6 @@
#include <process/future.hpp>
#include <process/http.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
#include <process/socket.hpp>
@@ -30,6 +29,7 @@
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
#include "decoder.hpp"
@@ -179,8 +179,7 @@ Future<string> Pipe::Reader::read()
{
Future<string> future;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->readEnd == Reader::CLOSED) {
future = Failure("closed");
} else if (!data->writes.empty()) {
@@ -196,7 +195,6 @@ Future<string> Pipe::Reader::read()
future = data->reads.back()->future();
}
}
- process::internal::release(&data->lock);
return future;
}
@@ -208,8 +206,7 @@ bool Pipe::Reader::close()
bool notify = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->readEnd == Reader::OPEN) {
// Throw away outstanding data.
while (!data->writes.empty()) {
@@ -226,7 +223,6 @@ bool Pipe::Reader::close()
notify = data->writeEnd == Writer::OPEN;
}
}
- process::internal::release(&data->lock);
// NOTE: We transition the promises outside the critical section
// to avoid triggering callbacks that try to reacquire the lock.
@@ -250,8 +246,7 @@ bool Pipe::Writer::write(const string& s)
bool written = false;
Owned<Promise<string>> read;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
// Ignore writes if either end of the pipe is closed or failed!
if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
// Don't bother surfacing empty writes to the readers.
@@ -266,7 +261,6 @@ bool Pipe::Writer::write(const string& s)
written = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promise outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
@@ -283,8 +277,7 @@ bool Pipe::Writer::close()
bool closed = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can complete them.
std::swap(data->reads, reads);
@@ -293,7 +286,6 @@ bool Pipe::Writer::close()
closed = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
@@ -311,8 +303,7 @@ bool Pipe::Writer::fail(const string& message)
bool failed = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can fail them.
std::swap(data->reads, reads);
@@ -322,7 +313,6 @@ bool Pipe::Writer::fail(const string& message)
failed = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.