You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/07/11 21:55:07 UTC
[2/2] mesos git commit: Added "relink" semantics to ProcessBase::link.
Added "relink" semantics to ProcessBase::link.
The `RemoteConnection:RECONNECT` option for `ProcessBase::link` will
force the `SocketManager` to create a new socket if a persistent link
already exists.
Review: https://reviews.apache.org/r/49177/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1415fc9f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1415fc9f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1415fc9f
Branch: refs/heads/0.28.x
Commit: 1415fc9f6b4c3e6220b410ab95ba695dbbace69a
Parents: 9bcdc69
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Jun 29 18:20:44 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Jul 11 14:02:12 2016 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/process.hpp | 31 ++++++-
3rdparty/libprocess/src/process.cpp | 94 ++++++++++++++------
2 files changed, 95 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1415fc9f/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index c9ef4e8..1fe94c4 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -139,6 +139,33 @@ protected:
size_t length = 0);
/**
+ * Describes the behavior of the `link` call when the target `pid`
+ * points to a remote process. This enum has no effect if the target
+ * `pid` points to a local process.
+ */
+ enum class RemoteConnection
+ {
+ /**
+ * If a persistent socket to the target `pid` does not exist,
+ * a new link is created. If a persistent socket already exists,
+ * `link` will subscribe this process to the existing link.
+ *
+ * This is the default behavior.
+ */
+ REUSE,
+
+ /**
+ * If a persistent socket to the target `pid` does not exist,
+ * a new link is created. If a persistent socket already exists,
+ * `link` create a new socket connection with the target `pid`
+ * and *atomically* swap the existing link with the new link.
+ *
+ * Existing linkers will remain linked, albeit via the new socket.
+ */
+ RECONNECT,
+ };
+
+ /**
* Links with the specified `UPID`.
*
* Linking with a process from within the same "operating system
@@ -147,7 +174,9 @@ protected:
* result in receiving lost callbacks due to the nature of a distributed
* environment.
*/
- UPID link(const UPID& pid);
+ UPID link(
+ const UPID& pid,
+ const RemoteConnection remote = RemoteConnection::REUSE);
/**
* Any function which takes a "from" `UPID` and a message body as
http://git-wip-us.apache.org/repos/asf/mesos/blob/1415fc9f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bad574a..38e2568 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -299,6 +299,7 @@ public:
void link(ProcessBase* process,
const UPID& to,
+ const ProcessBase::RemoteConnection remote,
const Socket::Kind& kind = Socket::DEFAULT_KIND());
PID<HttpProxy> proxy(const Socket& socket);
@@ -412,7 +413,12 @@ public:
UPID spawn(ProcessBase* process, bool manage);
void resume(ProcessBase* process);
void cleanup(ProcessBase* process);
- void link(ProcessBase* process, const UPID& to);
+
+ void link(
+ ProcessBase* process,
+ const UPID& to,
+ const ProcessBase::RemoteConnection remote);
+
void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
bool wait(const UPID& pid);
@@ -1418,6 +1424,7 @@ void SocketManager::link_connect(
void SocketManager::link(
ProcessBase* process,
const UPID& to,
+ const ProcessBase::RemoteConnection remote,
const Socket::Kind& kind)
{
// TODO(benh): The semantics we want to support for link are such
@@ -1434,33 +1441,59 @@ void SocketManager::link(
bool connect = false;
synchronized (mutex) {
- // Check if the socket address is remote and there isn't a persistant link.
- if (to.address != __address__ && persists.count(to.address) == 0) {
- // Okay, no link, let's create a socket.
- // The kind of socket we create is passed in as an argument.
- // This allows us to support downgrading the connection type
- // from SSL to POLL if enabled.
- Try<Socket> create = Socket::create(kind);
- if (create.isError()) {
- VLOG(1) << "Failed to link, create socket: " << create.error();
- return;
- }
- socket = create.get();
- int s = socket.get().get();
+ // Check if the socket address is remote.
+ if (to.address != __address__) {
+ // Check if there isn't already a persistent link.
+ if (persists.count(to.address) == 0) {
+ // Okay, no link, let's create a socket.
+ // The kind of socket we create is passed in as an argument.
+ // This allows us to support downgrading the connection type
+ // from SSL to POLL if enabled.
+ Try<Socket> create = Socket::create(kind);
+ if (create.isError()) {
+ VLOG(1) << "Failed to link, create socket: " << create.error();
+ return;
+ }
+ socket = create.get();
+ int s = socket.get().get();
+
+ sockets[s] = new Socket(socket.get());
+ addresses[s] = to.address;
+
+ persists[to.address] = s;
+
+ // Initialize 'outgoing' to prevent a race with
+ // SocketManager::send() while the socket is not yet connected.
+ // Initializing the 'outgoing' queue prevents
+ // SocketManager::send() from trying to write before it's
+ // connected.
+ outgoing[s];
+
+ connect = true;
+ } else if (remote == ProcessBase::RemoteConnection::RECONNECT) {
+ // There is a persistent link already and the linker wants to
+ // create a new socket anyway.
+ Try<Socket> create = Socket::create(kind);
+ if (create.isError()) {
+ VLOG(1) << "Failed to link, create socket: " << create.error();
+ return;
+ }
- sockets[s] = new Socket(socket.get());
- addresses[s] = to.address;
+ socket = create.get();
- persists[to.address] = s;
+ // Grab a copy of the existing socket that we're swapping.
+ // This should go out of scope and be closed after the swap
+ // is complete.
+ Socket existing(*sockets.at(persists.at(to.address)));
- // Initialize 'outgoing' to prevent a race with
- // SocketManager::send() while the socket is not yet connected.
- // Initializing the 'outgoing' queue prevents
- // SocketManager::send() from trying to write before it's
- // connected.
- outgoing[s];
+ // Update all the data structures that are mapped to the old
+ // socket. They will now point to the new socket we are about
+ // to try to connect. The old socket should no longer have any
+ // references after the swap and should be closed.
+ swap_implementing_socket(existing, new Socket(socket.get()));
- connect = true;
+ connect = true;
+ }
}
links.linkers[to].insert(process);
@@ -2698,17 +2731,20 @@ void ProcessManager::cleanup(ProcessBase* process)
}
-void ProcessManager::link(ProcessBase* process, const UPID& to)
+void ProcessManager::link(
+ ProcessBase* process,
+ const UPID& to,
+ const ProcessBase::RemoteConnection remote)
{
// Check if the pid is local.
if (to.address != __address__) {
- socket_manager->link(process, to);
+ socket_manager->link(process, to, remote);
} else {
// Since the pid is local we want to get a reference to it's
// underlying process so that while we are invoking the link
// manager we don't miss sending a possible ExitedEvent.
if (ProcessReference _ = use(to)) {
- socket_manager->link(process, to);
+ socket_manager->link(process, to, remote);
} else {
// Since the pid isn't valid it's process must have already died
// (or hasn't been spawned yet) so send a process exit message.
@@ -3283,13 +3319,13 @@ void ProcessBase::visit(const TerminateEvent& event)
}
-UPID ProcessBase::link(const UPID& to)
+UPID ProcessBase::link(const UPID& to, const RemoteConnection remote)
{
if (!to) {
return to;
}
- process_manager->link(this, to);
+ process_manager->link(this, to, remote);
return to;
}