You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:22 UTC

[25/30] mesos git commit: Add connect() to the Socket interface.

Add connect() to the Socket interface.

Review: https://reviews.apache.org/r/27958


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d616f8d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d616f8d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d616f8d

Branch: refs/heads/master
Commit: 4d616f8d7194be9b251aa5a3ddc51783c4d4411a
Parents: 702b382
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  41 ++++-
 3rdparty/libprocess/src/process.cpp            | 193 +++++++++++---------
 2 files changed, 138 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 9f4302e..fdad91f 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,12 @@
 
 #include <assert.h>
 
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/node.hpp>
+
 #include <stout/abort.hpp>
-#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
 #include <stout/try.hpp>
@@ -14,7 +18,8 @@ namespace process {
 
 // Returns a socket fd for the specified options. Note that on OS X,
 // the returned socket will have the SO_NOSIGPIPE option set.
-inline Try<int> socket(int family, int type, int protocol) {
+inline Try<int> socket(int family, int type, int protocol)
+{
   int s;
   if ((s = ::socket(family, type, protocol)) == -1) {
     return ErrnoError();
@@ -41,7 +46,12 @@ inline Try<int> socket(int family, int type, int protocol) {
 class Socket
 {
 public:
-  class Impl
+  // Each socket is a reference counted, shared by default, concurrent
+  // object. However, since we want to support multiple
+  // implementations we use the Pimpl pattern (often called the
+  // compilation firewall) rather than forcing each Socket
+  // implementation to do this themselves.
+  class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
     Impl() : s(-1) {}
@@ -53,8 +63,8 @@ public:
       if (s >= 0) {
         Try<Nothing> close = os::close(s);
         if (close.isError()) {
-          ABORT(
-            "Failed to close socket " + stringify(s) + ": " + close.error());
+          ABORT("Failed to close socket " +
+                stringify(s) + ": " + close.error());
         }
       }
     }
@@ -64,6 +74,8 @@ public:
       return s >= 0 ? s : create().get();
     }
 
+    Future<Socket> connect(const Node& node);
+
   private:
     const Impl& create() const
     {
@@ -78,6 +90,11 @@ public:
     }
 
     // Mutable so that the socket can be lazily created.
+    //
+    // TODO(benh): Create a factory for sockets and don't lazily
+    // create but instead return a Try<Socket> from the factory in
+    // order to eliminate the need for a mutable member or the call to
+    // ABORT above.
     mutable int s;
   };
 
@@ -95,8 +112,20 @@ public:
     return impl->get();
   }
 
+  int get() const
+  {
+    return impl->get();
+  }
+
+  Future<Socket> connect(const Node& node)
+  {
+    return impl->connect(node);
+  }
+
 private:
-  memory::shared_ptr<Impl> impl;
+  explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+
+  std::shared_ptr<Impl> impl;
 };
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 48e5486..6916cbb 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1484,6 +1484,48 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
 }
 
 
+namespace internal {
+
+Future<Socket> connect(const Socket& socket)
+{
+  // Now check that a successful connection was made.
+  int opt;
+  socklen_t optlen = sizeof(opt);
+  int s = socket.get();
+
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+    // Connect failure.
+    VLOG(1) << "Socket error while connecting";
+    return Failure("Socket error while connecting");
+  }
+
+  return socket;
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::connect(const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_port = htons(node.port);
+  addr.sin_addr.s_addr = node.ip;
+
+  if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+    if (errno != EINPROGRESS) {
+      return Failure(ErrnoError("Failed to connect socket"));
+    }
+
+    return io::poll(get(), io::WRITE)
+      .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+  }
+
+  return Socket(shared_from_this());
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1503,6 +1545,25 @@ Socket SocketManager::accepted(int s)
 }
 
 
+namespace internal {
+
+void link_connect(const Future<Socket>& socket)
+{
+  if (socket.isDiscarded() || socket.isFailed()) {
+    if (socket.isFailed()) {
+      VLOG(1) << "Failed to link, connect: " << socket.failure();
+    }
+    socket_manager->close(socket.get());
+    return;
+  }
+
+  io::poll(socket.get(), io::READ)
+    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+}
+
+} // namespace internal {
+
+
 void SocketManager::link(ProcessBase* process, const UPID& to)
 {
   // TODO(benh): The semantics we want to support for link are such
@@ -1516,58 +1577,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
   CHECK(process != NULL);
 
   synchronized (this) {
+    links[to].insert(process);
+
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
-      if (socket.isError()) {
-        LOG(FATAL) << "Failed to link, socket: " << socket.error();
-      }
-
-      int s = socket.get();
-
-      Try<Nothing> nonblock = os::nonblock(s);
-      if (nonblock.isError()) {
-        LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
-      }
-
-      Try<Nothing> cloexec = os::cloexec(s);
-      if (cloexec.isError()) {
-        LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
-      }
+      Socket socket;
+      int s = socket;
 
-      sockets[s] = Socket(s);
+      sockets[s] = socket;
       nodes[s] = to.node;
 
       persists[to.node] = s;
 
-      // Try and connect to the node using this socket in order to
-      // start reading data. Note that we don't expect to receive
-      // anything other than HTTP '202 Accepted' responses which we
-      // anyway ignore.  We do, however, want to react when it gets
-      // closed so we can generate appropriate lost events (since this
-      // is a 'link').
-      sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = PF_INET;
-      addr.sin_port = htons(to.node.port);
-      addr.sin_addr.s_addr = to.node.ip;
-
-      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS) {
-          PLOG(FATAL) << "Failed to link, connect";
-        }
-
-        // Wait for socket to be connected.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
-      } else {
-        io::poll(s, io::READ)
-          .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-      }
+      socket.connect(to.node)
+        .onAny(lambda::bind(&internal::link_connect, lambda::_1));
     }
-
-    links[to].insert(process);
   }
 }
 
@@ -1653,11 +1678,40 @@ void SocketManager::send(
 }
 
 
+namespace internal {
+
+void send_connect(const Future<Socket>& socket, Message* message)
+{
+  if (socket.isDiscarded() || socket.isFailed()) {
+    if (socket.isFailed()) {
+      VLOG(1) << "Failed to send, connect: " << socket.failure();
+    }
+    socket_manager->close(socket.get());
+    delete message;
+    return;
+  }
+
+  Encoder* encoder = new MessageEncoder(socket.get(), message);
+
+  // Read and ignore data from this socket. Note that we don't
+  // expect to receive anything other than HTTP '202 Accepted'
+  // responses which we just ignore.
+  io::poll(socket.get(), io::READ)
+    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+
+  // Start polling in order to send data.
+  io::poll(socket.get(), io::WRITE)
+    .onAny(lambda::bind(&send_data, encoder));
+}
+
+} // namespace internal {
+
+
 void SocketManager::send(Message* message)
 {
   CHECK(message != NULL);
 
-  Node node(message->to.node);
+  const Node& node = message->to.node;
 
   synchronized (this) {
     // Check if there is already a socket.
@@ -1670,24 +1724,10 @@ void SocketManager::send(Message* message)
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
-      if (socket.isError()) {
-        LOG(FATAL) << "Failed to send, socket: " << socket.error();
-      }
-
-      int s = socket.get();
+      Socket socket;
+      int s = socket;
 
-      Try<Nothing> nonblock = os::nonblock(s);
-      if (nonblock.isError()) {
-        LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
-      }
-
-      Try<Nothing> cloexec = os::cloexec(s);
-      if (cloexec.isError()) {
-        LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
-      }
-
-      sockets[s] = Socket(s);
+      sockets[s] = socket;
       nodes[s] = node;
       temps[node] = s;
 
@@ -1696,35 +1736,8 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      // Create a message encoder to handle sending this message.
-      Encoder* encoder = new MessageEncoder(sockets[s], message);
-
-      // Read and ignore data from this socket. Note that we don't
-      // expect to receive anything other than HTTP '202 Accepted'
-      // responses which we just ignore.
-      io::poll(s, io::READ)
-        .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-
-      // Try and connect to the node using this socket.
-      sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = PF_INET;
-      addr.sin_port = htons(node.port);
-      addr.sin_addr.s_addr = node.ip;
-
-      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS) {
-          PLOG(FATAL) << "Failed to send, connect";
-        }
-
-        // Start polling in order to wait for being connected.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&sending_connect, encoder));
-      } else {
-        // Start polling in order to send data.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&send_data, encoder));
-      }
+      socket.connect(node)
+        .onAny(lambda::bind(&internal::send_connect, lambda::_1, message));
     }
   }
 }