You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/24 19:13:03 UTC

[1/2] mesos git commit: Fix deadlock between SocketManager::send() and internal::send().

Repository: mesos
Updated Branches:
  refs/heads/master d9dd07960 -> 33befcb17


Fix deadlock between SocketManager::send() and internal::send().

This was a deadlock between the 'this' and 'processes' synchronized
blocks that occured because the .then() on socket futures can be
invoked immediately with the new implementation of io::poll().

Review: https://reviews.apache.org/r/28367


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a84e441
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a84e441
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a84e441

Branch: refs/heads/master
Commit: 0a84e441e51bd81fead2aaad5b7fbd2b2e8ad590
Parents: d9dd079
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Nov 23 15:57:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 23 15:57:23 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 67 ++++++++++++++++++++++++--------
 1 file changed, 51 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0a84e441/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index cf75b51..7799bd8 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1571,13 +1571,15 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
   CHECK(process != NULL);
 
+  Socket socket;
+  bool connect = false;
+
   synchronized (this) {
     links[to].insert(process);
 
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      Socket socket;
       int s = socket;
 
       sockets[s] = socket;
@@ -1592,13 +1594,17 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
       // connected.
       outgoing[s];
 
-      socket.connect(to.node)
-        .onAny(lambda::bind(
-            &internal::link_connect,
-            lambda::_1,
-            new Socket(socket)));
+      connect = true;
     }
   }
+
+  if (connect) {
+    socket.connect(to.node)
+      .onAny(lambda::bind(
+          &internal::link_connect,
+          lambda::_1,
+          new Socket(socket)));
+  }
 }
 
 
@@ -1724,17 +1730,21 @@ void SocketManager::send(Encoder* encoder, bool persist)
 
       if (outgoing.count(socket) > 0) {
         outgoing[socket].push(encoder);
+        encoder = NULL;
       } else {
         // Initialize the outgoing queue.
         outgoing[socket];
-
-        internal::send(encoder, new Socket(socket));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
       delete encoder;
+      encoder = NULL;
     }
   }
+
+  if (encoder != NULL) {
+    internal::send(encoder, new Socket(encoder->socket()));
+  }
 }
 
 
@@ -1802,6 +1812,9 @@ void SocketManager::send(Message* message)
 
   const Node& node = message->to.node;
 
+  Socket socket;
+  bool connect = false;
+
   synchronized (this) {
     // Check if there is already a socket.
     bool persist = persists.count(node) > 0;
@@ -1809,11 +1822,25 @@ void SocketManager::send(Message* message)
     if (persist || temp) {
       int s = persist ? persists[node] : temps[node];
       CHECK(sockets.count(s) > 0);
-      send(new MessageEncoder(sockets[s], message), persist);
+      socket = sockets[s];
+
+      // Update whether or not this socket should get disposed after
+      // there is no more data to send.
+      if (!persist) {
+        dispose.insert(socket);
+      }
+
+      if (outgoing.count(socket) > 0) {
+        outgoing[socket].push(new MessageEncoder(socket, message));
+        return;
+      } else {
+        // Initialize the outgoing queue.
+        outgoing[socket];
+      }
+
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      Socket socket;
       int s = socket;
 
       sockets[s] = socket;
@@ -1825,14 +1852,22 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      socket.connect(node)
-        .onAny(lambda::bind(
-            &internal::send_connect,
-            lambda::_1,
-            new Socket(socket),
-            message));
+      connect = true;
     }
   }
+
+  if (connect) {
+    socket.connect(node)
+      .onAny(lambda::bind(
+          &internal::send_connect,
+          lambda::_1,
+          new Socket(socket),
+          message));
+  } else {
+    // If we're not connecting and we haven't added the encoder to
+    // the 'outgoing' queue then schedule it to be sent.
+    internal::send(new MessageEncoder(socket, message), new Socket(socket));
+  }
 }
 
 


[2/2] mesos git commit: Add thread local to short-circuit run_in_event_loop().

Posted by be...@apache.org.
Add thread local to short-circuit run_in_event_loop().

This improves performance by short-circuiting run_in_event_loop() to
just call the lambda if it is already in the event loop.

Review: https://reviews.apache.org/r/28184


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33befcb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33befcb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33befcb1

Branch: refs/heads/master
Commit: 33befcb17661951b8e22c1870af8691eb0a0ee2d
Parents: 0a84e44
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Nov 23 16:02:19 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 24 10:12:53 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/libev.cpp   |  6 ++++++
 3rdparty/libprocess/src/libev.hpp   | 15 +++++++++++++++
 3rdparty/libprocess/src/process.cpp |  4 ++++
 3 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 6560050..8a557ce 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -11,10 +11,16 @@ namespace process {
 // Defines the initial values for all of the declarations made in
 // libev.hpp (since these need to live in the static data space).
 struct ev_loop* loop = NULL;
+
 ev_async async_watcher;
+
 std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+
 synchronizable(watchers);
+
 std::queue<lambda::function<void(void)>>* functions =
   new std::queue<lambda::function<void(void)>>();
 
+ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index 04847e3..a43e7d7 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -6,6 +6,7 @@
 #include <process/owned.hpp>
 
 #include <stout/lambda.hpp>
+#include <stout/thread.hpp>
 
 #include "synchronized.hpp"
 
@@ -29,6 +30,15 @@ extern synchronizable(watchers);
 // loop (protected by 'watchers' above).
 extern std::queue<lambda::function<void(void)>>* functions;
 
+// Per thread bool pointer. The extra level of indirection from
+// _in_event_loop_ to __in_event_loop__ is used in order to take
+// advantage of the ThreadLocal operators without needing the extra
+// dereference as well as lazily construct the actual bool.
+extern ThreadLocal<bool>* _in_event_loop_;
+
+#define __in_event_loop__ *(*_in_event_loop_ == NULL ?               \
+  *_in_event_loop_ = new bool(false) : *_in_event_loop_)
+
 
 // Wrapper around function we want to run in the event loop.
 template <typename T>
@@ -49,6 +59,11 @@ void _run_in_event_loop(
 template <typename T>
 Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
 {
+  // If this is already the event loop then just run the function.
+  if (__in_event_loop__) {
+    return f();
+  }
+
   Owned<Promise<T>> promise(new Promise<T>());
 
   Future<T> future = promise->future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 7799bd8..89e0e0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -645,8 +645,12 @@ void decode_read(
 
 void* serve(void* arg)
 {
+  __in_event_loop__ = true;
+
   ev_loop(((struct ev_loop*) arg), 0);
 
+  __in_event_loop__ = false;
+
   return NULL;
 }