You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/07/11 21:55:06 UTC

[1/2] mesos git commit: Changed the replicated log's network abstraction to use "relink".

Repository: mesos
Updated Branches:
  refs/heads/0.28.x 9bcdc6976 -> cf7afe7c8


Changed the replicated log's network abstraction to use "relink".

Review: https://reviews.apache.org/r/49346/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf7afe7c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf7afe7c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf7afe7c

Branch: refs/heads/0.28.x
Commit: cf7afe7c8f786b8dbf65da43f5579be756f63c0c
Parents: 1415fc9
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Jun 29 18:20:46 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Jul 11 14:02:12 2016 -0700

----------------------------------------------------------------------
 src/log/network.hpp | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cf7afe7c/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index 6d40027..e389324 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -156,7 +156,24 @@ public:
 
   void add(const process::UPID& pid)
   {
-    link(pid); // Try and keep a socket open (more efficient).
+    // Link in order to keep a socket open (more efficient).
+    //
+    // We force a reconnect to avoid sending on a "stale" socket. In
+    // general when linking to a remote process, the underlying TCP
+    // connection may become "stale". RFC 793 refers to this as a
+    // "half-open" connection: the RST is not sent upon the death
+    // of the peer and a RST will only be received once further
+    // data is sent on the socket.
+    //
+    // "Half-open" (aka "stale") connections are typically addressed
+    // via keep-alives (see RFC 1122 4.2.3.6) to periodically probe
+    // the connection. In this case, we can rely on the (re-)addition
+    // of the network member to create a new connection.
+    //
+    // See MESOS-5576 for a scenario where reconnecting helps avoid
+    // dropped messages.
+    link(pid, RemoteConnection::RECONNECT);
+
     pids.insert(pid);
 
     // Update any pending watches.


[2/2] mesos git commit: Added "relink" semantics to ProcessBase::link.

Posted by jo...@apache.org.
Added "relink" semantics to ProcessBase::link.

The `RemoteConnection:RECONNECT` option for `ProcessBase::link` will
force the `SocketManager` to create a new socket if a persistent link
already exists.

Review: https://reviews.apache.org/r/49177/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1415fc9f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1415fc9f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1415fc9f

Branch: refs/heads/0.28.x
Commit: 1415fc9f6b4c3e6220b410ab95ba695dbbace69a
Parents: 9bcdc69
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Jun 29 18:20:44 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Jul 11 14:02:12 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/process.hpp | 31 ++++++-
 3rdparty/libprocess/src/process.cpp             | 94 ++++++++++++++------
 2 files changed, 95 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1415fc9f/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index c9ef4e8..1fe94c4 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -139,6 +139,33 @@ protected:
       size_t length = 0);
 
   /**
+   * Describes the behavior of the `link` call when the target `pid`
+   * points to a remote process. This enum has no effect if the target
+   * `pid` points to a local process.
+   */
+  enum class RemoteConnection
+  {
+    /**
+     * If a persistent socket to the target `pid` does not exist,
+     * a new link is created. If a persistent socket already exists,
+     * `link` will subscribe this process to the existing link.
+     *
+     * This is the default behavior.
+     */
+    REUSE,
+
+    /**
+     * If a persistent socket to the target `pid` does not exist,
+     * a new link is created. If a persistent socket already exists,
+     * `link` create a new socket connection with the target `pid`
+     * and *atomically* swap the existing link with the new link.
+     *
+     * Existing linkers will remain linked, albeit via the new socket.
+     */
+    RECONNECT,
+  };
+
+  /**
    * Links with the specified `UPID`.
    *
    * Linking with a process from within the same "operating system
@@ -147,7 +174,9 @@ protected:
    * result in receiving lost callbacks due to the nature of a distributed
    * environment.
    */
-  UPID link(const UPID& pid);
+  UPID link(
+      const UPID& pid,
+      const RemoteConnection remote = RemoteConnection::REUSE);
 
   /**
    * Any function which takes a "from" `UPID` and a message body as

http://git-wip-us.apache.org/repos/asf/mesos/blob/1415fc9f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bad574a..38e2568 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -299,6 +299,7 @@ public:
 
   void link(ProcessBase* process,
             const UPID& to,
+            const ProcessBase::RemoteConnection remote,
             const Socket::Kind& kind = Socket::DEFAULT_KIND());
 
   PID<HttpProxy> proxy(const Socket& socket);
@@ -412,7 +413,12 @@ public:
   UPID spawn(ProcessBase* process, bool manage);
   void resume(ProcessBase* process);
   void cleanup(ProcessBase* process);
-  void link(ProcessBase* process, const UPID& to);
+
+  void link(
+      ProcessBase* process,
+      const UPID& to,
+      const ProcessBase::RemoteConnection remote);
+
   void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
   bool wait(const UPID& pid);
 
@@ -1418,6 +1424,7 @@ void SocketManager::link_connect(
 void SocketManager::link(
     ProcessBase* process,
     const UPID& to,
+    const ProcessBase::RemoteConnection remote,
     const Socket::Kind& kind)
 {
   // TODO(benh): The semantics we want to support for link are such
@@ -1434,33 +1441,59 @@ void SocketManager::link(
   bool connect = false;
 
   synchronized (mutex) {
-    // Check if the socket address is remote and there isn't a persistant link.
-    if (to.address != __address__  && persists.count(to.address) == 0) {
-      // Okay, no link, let's create a socket.
-      // The kind of socket we create is passed in as an argument.
-      // This allows us to support downgrading the connection type
-      // from SSL to POLL if enabled.
-      Try<Socket> create = Socket::create(kind);
-      if (create.isError()) {
-        VLOG(1) << "Failed to link, create socket: " << create.error();
-        return;
-      }
-      socket = create.get();
-      int s = socket.get().get();
+    // Check if the socket address is remote.
+    if (to.address != __address__) {
+      // Check if there isn't already a persistent link.
+      if (persists.count(to.address) == 0) {
+        // Okay, no link, let's create a socket.
+        // The kind of socket we create is passed in as an argument.
+        // This allows us to support downgrading the connection type
+        // from SSL to POLL if enabled.
+        Try<Socket> create = Socket::create(kind);
+        if (create.isError()) {
+          VLOG(1) << "Failed to link, create socket: " << create.error();
+          return;
+        }
+        socket = create.get();
+        int s = socket.get().get();
+
+        sockets[s] = new Socket(socket.get());
+        addresses[s] = to.address;
+
+        persists[to.address] = s;
+
+        // Initialize 'outgoing' to prevent a race with
+        // SocketManager::send() while the socket is not yet connected.
+        // Initializing the 'outgoing' queue prevents
+        // SocketManager::send() from trying to write before it's
+        // connected.
+        outgoing[s];
+
+        connect = true;
+      } else if (remote == ProcessBase::RemoteConnection::RECONNECT) {
+        // There is a persistent link already and the linker wants to
+        // create a new socket anyway.
+        Try<Socket> create = Socket::create(kind);
+        if (create.isError()) {
+          VLOG(1) << "Failed to link, create socket: " << create.error();
+          return;
+        }
 
-      sockets[s] = new Socket(socket.get());
-      addresses[s] = to.address;
+        socket = create.get();
 
-      persists[to.address] = s;
+        // Grab a copy of the existing socket that we're swapping.
+        // This should go out of scope and be closed after the swap
+        // is complete.
+        Socket existing(*sockets.at(persists.at(to.address)));
 
-      // Initialize 'outgoing' to prevent a race with
-      // SocketManager::send() while the socket is not yet connected.
-      // Initializing the 'outgoing' queue prevents
-      // SocketManager::send() from trying to write before it's
-      // connected.
-      outgoing[s];
+        // Update all the data structures that are mapped to the old
+        // socket. They will now point to the new socket we are about
+        // to try to connect. The old socket should no longer have any
+        // references after the swap and should be closed.
+        swap_implementing_socket(existing, new Socket(socket.get()));
 
-      connect = true;
+        connect = true;
+      }
     }
 
     links.linkers[to].insert(process);
@@ -2698,17 +2731,20 @@ void ProcessManager::cleanup(ProcessBase* process)
 }
 
 
-void ProcessManager::link(ProcessBase* process, const UPID& to)
+void ProcessManager::link(
+    ProcessBase* process,
+    const UPID& to,
+    const ProcessBase::RemoteConnection remote)
 {
   // Check if the pid is local.
   if (to.address != __address__) {
-    socket_manager->link(process, to);
+    socket_manager->link(process, to, remote);
   } else {
     // Since the pid is local we want to get a reference to it's
     // underlying process so that while we are invoking the link
     // manager we don't miss sending a possible ExitedEvent.
     if (ProcessReference _ = use(to)) {
-      socket_manager->link(process, to);
+      socket_manager->link(process, to, remote);
     } else {
       // Since the pid isn't valid it's process must have already died
       // (or hasn't been spawned yet) so send a process exit message.
@@ -3283,13 +3319,13 @@ void ProcessBase::visit(const TerminateEvent& event)
 }
 
 
-UPID ProcessBase::link(const UPID& to)
+UPID ProcessBase::link(const UPID& to, const RemoteConnection remote)
 {
   if (!to) {
     return to;
   }
 
-  process_manager->link(this, to);
+  process_manager->link(this, to, remote);
 
   return to;
 }