You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:30 UTC

[9/9] mesos git commit: Refactor http to use synchronized.

Refactor http to use synchronized.

Review: https://reviews.apache.org/r/32364


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94c0f68d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94c0f68d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94c0f68d

Branch: refs/heads/master
Commit: 94c0f68dc8a32b6cd768c18d6d8d65d40833296f
Parents: 8c2d8d4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:28 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:29 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 11 +++++++----
 3rdparty/libprocess/src/http.cpp             | 22 ++++++----------------
 2 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index bba62b3..51a00f5 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -3,6 +3,7 @@
 
 #include <stdint.h>
 
+#include <atomic>
 #include <iosfwd>
 #include <memory>
 #include <queue>
@@ -191,12 +192,14 @@ public:
 private:
   struct Data
   {
-    Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
+    Data()
+      : lock(ATOMIC_FLAG_INIT),
+        readEnd(Reader::OPEN),
+        writeEnd(Writer::OPEN) {}
 
     // Rather than use a process to serialize access to the pipe's
-    // internal data we use a low-level "lock" which we acquire and
-    // release using atomic builtins.
-    int lock;
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock;
 
     Reader::State readEnd;
     Writer::State writeEnd;

http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1d318b9..0898335 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -18,7 +18,6 @@
 
 #include <process/future.hpp>
 #include <process/http.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 #include <process/socket.hpp>
 
@@ -30,6 +29,7 @@
 #include <stout/numify.hpp>
 #include <stout/option.hpp>
 #include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
 #include "decoder.hpp"
@@ -179,8 +179,7 @@ Future<string> Pipe::Reader::read()
 {
   Future<string> future;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->readEnd == Reader::CLOSED) {
       future = Failure("closed");
     } else if (!data->writes.empty()) {
@@ -196,7 +195,6 @@ Future<string> Pipe::Reader::read()
       future = data->reads.back()->future();
     }
   }
-  process::internal::release(&data->lock);
 
   return future;
 }
@@ -208,8 +206,7 @@ bool Pipe::Reader::close()
   bool notify = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->readEnd == Reader::OPEN) {
       // Throw away outstanding data.
       while (!data->writes.empty()) {
@@ -226,7 +223,6 @@ bool Pipe::Reader::close()
       notify = data->writeEnd == Writer::OPEN;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We transition the promises outside the critical section
   // to avoid triggering callbacks that try to reacquire the lock.
@@ -250,8 +246,7 @@ bool Pipe::Writer::write(const string& s)
   bool written = false;
   Owned<Promise<string>> read;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     // Ignore writes if either end of the pipe is closed or failed!
     if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
       // Don't bother surfacing empty writes to the readers.
@@ -266,7 +261,6 @@ bool Pipe::Writer::write(const string& s)
       written = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promise outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.
@@ -283,8 +277,7 @@ bool Pipe::Writer::close()
   bool closed = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can complete them.
       std::swap(data->reads, reads);
@@ -293,7 +286,6 @@ bool Pipe::Writer::close()
       closed = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promises outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.
@@ -311,8 +303,7 @@ bool Pipe::Writer::fail(const string& message)
   bool failed = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can fail them.
       std::swap(data->reads, reads);
@@ -322,7 +313,6 @@ bool Pipe::Writer::fail(const string& message)
       failed = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promises outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.