You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/24 19:13:03 UTC
[1/2] mesos git commit: Fix deadlock between SocketManager::send()
and internal::send().
Repository: mesos
Updated Branches:
refs/heads/master d9dd07960 -> 33befcb17
Fix deadlock between SocketManager::send() and internal::send().
This was a deadlock between the 'this' and 'processes' synchronized
blocks that occured because the .then() on socket futures can be
invoked immediately with the new implementation of io::poll().
Review: https://reviews.apache.org/r/28367
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a84e441
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a84e441
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a84e441
Branch: refs/heads/master
Commit: 0a84e441e51bd81fead2aaad5b7fbd2b2e8ad590
Parents: d9dd079
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Nov 23 15:57:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 23 15:57:23 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 67 ++++++++++++++++++++++++--------
1 file changed, 51 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0a84e441/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index cf75b51..7799bd8 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1571,13 +1571,15 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
+ Socket socket;
+ bool connect = false;
+
synchronized (this) {
links[to].insert(process);
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
- Socket socket;
int s = socket;
sockets[s] = socket;
@@ -1592,13 +1594,17 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
// connected.
outgoing[s];
- socket.connect(to.node)
- .onAny(lambda::bind(
- &internal::link_connect,
- lambda::_1,
- new Socket(socket)));
+ connect = true;
}
}
+
+ if (connect) {
+ socket.connect(to.node)
+ .onAny(lambda::bind(
+ &internal::link_connect,
+ lambda::_1,
+ new Socket(socket)));
+ }
}
@@ -1724,17 +1730,21 @@ void SocketManager::send(Encoder* encoder, bool persist)
if (outgoing.count(socket) > 0) {
outgoing[socket].push(encoder);
+ encoder = NULL;
} else {
// Initialize the outgoing queue.
outgoing[socket];
-
- internal::send(encoder, new Socket(socket));
}
} else {
VLOG(1) << "Attempting to send on a no longer valid socket!";
delete encoder;
+ encoder = NULL;
}
}
+
+ if (encoder != NULL) {
+ internal::send(encoder, new Socket(encoder->socket()));
+ }
}
@@ -1802,6 +1812,9 @@ void SocketManager::send(Message* message)
const Node& node = message->to.node;
+ Socket socket;
+ bool connect = false;
+
synchronized (this) {
// Check if there is already a socket.
bool persist = persists.count(node) > 0;
@@ -1809,11 +1822,25 @@ void SocketManager::send(Message* message)
if (persist || temp) {
int s = persist ? persists[node] : temps[node];
CHECK(sockets.count(s) > 0);
- send(new MessageEncoder(sockets[s], message), persist);
+ socket = sockets[s];
+
+ // Update whether or not this socket should get disposed after
+ // there is no more data to send.
+ if (!persist) {
+ dispose.insert(socket);
+ }
+
+ if (outgoing.count(socket) > 0) {
+ outgoing[socket].push(new MessageEncoder(socket, message));
+ return;
+ } else {
+ // Initialize the outgoing queue.
+ outgoing[socket];
+ }
+
} else {
// No peristent or temporary socket to the node currently
// exists, so we create a temporary one.
- Socket socket;
int s = socket;
sockets[s] = socket;
@@ -1825,14 +1852,22 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- socket.connect(node)
- .onAny(lambda::bind(
- &internal::send_connect,
- lambda::_1,
- new Socket(socket),
- message));
+ connect = true;
}
}
+
+ if (connect) {
+ socket.connect(node)
+ .onAny(lambda::bind(
+ &internal::send_connect,
+ lambda::_1,
+ new Socket(socket),
+ message));
+ } else {
+ // If we're not connecting and we haven't added the encoder to
+ // the 'outgoing' queue then schedule it to be sent.
+ internal::send(new MessageEncoder(socket, message), new Socket(socket));
+ }
}
[2/2] mesos git commit: Add thread local to short-circuit
run_in_event_loop().
Posted by be...@apache.org.
Add thread local to short-circuit run_in_event_loop().
This improves performance by short-circuiting run_in_event_loop() to
just call the lambda if it is already in the event loop.
Review: https://reviews.apache.org/r/28184
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33befcb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33befcb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33befcb1
Branch: refs/heads/master
Commit: 33befcb17661951b8e22c1870af8691eb0a0ee2d
Parents: 0a84e44
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Nov 23 16:02:19 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 24 10:12:53 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/libev.cpp | 6 ++++++
3rdparty/libprocess/src/libev.hpp | 15 +++++++++++++++
3rdparty/libprocess/src/process.cpp | 4 ++++
3 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 6560050..8a557ce 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -11,10 +11,16 @@ namespace process {
// Defines the initial values for all of the declarations made in
// libev.hpp (since these need to live in the static data space).
struct ev_loop* loop = NULL;
+
ev_async async_watcher;
+
std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+
synchronizable(watchers);
+
std::queue<lambda::function<void(void)>>* functions =
new std::queue<lambda::function<void(void)>>();
+ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index 04847e3..a43e7d7 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -6,6 +6,7 @@
#include <process/owned.hpp>
#include <stout/lambda.hpp>
+#include <stout/thread.hpp>
#include "synchronized.hpp"
@@ -29,6 +30,15 @@ extern synchronizable(watchers);
// loop (protected by 'watchers' above).
extern std::queue<lambda::function<void(void)>>* functions;
+// Per thread bool pointer. The extra level of indirection from
+// _in_event_loop_ to __in_event_loop__ is used in order to take
+// advantage of the ThreadLocal operators without needing the extra
+// dereference as well as lazily construct the actual bool.
+extern ThreadLocal<bool>* _in_event_loop_;
+
+#define __in_event_loop__ *(*_in_event_loop_ == NULL ? \
+ *_in_event_loop_ = new bool(false) : *_in_event_loop_)
+
// Wrapper around function we want to run in the event loop.
template <typename T>
@@ -49,6 +59,11 @@ void _run_in_event_loop(
template <typename T>
Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
{
+ // If this is already the event loop then just run the function.
+ if (__in_event_loop__) {
+ return f();
+ }
+
Owned<Promise<T>> promise(new Promise<T>());
Future<T> future = promise->future();
http://git-wip-us.apache.org/repos/asf/mesos/blob/33befcb1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 7799bd8..89e0e0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -645,8 +645,12 @@ void decode_read(
void* serve(void* arg)
{
+ __in_event_loop__ = true;
+
ev_loop(((struct ev_loop*) arg), 0);
+ __in_event_loop__ = false;
+
return NULL;
}