You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:00 UTC

[09/12] mesos git commit: Add a factory pattern for constructing a Socket.

Add a factory pattern for constructing a Socket.

Add a factory pattern to allow construction of different
implementations of the socket interface. Remove default constructor
from Socket.

Review: https://reviews.apache.org/r/28670


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3bb74c2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3bb74c2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3bb74c2

Branch: refs/heads/master
Commit: b3bb74c2359c6cf3d73bcb0e57a4732af804a15b
Parents: 62ad3f2
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 17:26:13 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |  1 +
 3rdparty/libprocess/include/process/socket.hpp  | 76 +++++-----------
 3rdparty/libprocess/src/poll_socket.cpp         |  6 +-
 3rdparty/libprocess/src/process.cpp             | 92 +++++++++++++-------
 3rdparty/libprocess/src/socket.cpp              | 56 ++++++++++++
 3rdparty/libprocess/src/tests/decoder_tests.cpp | 12 ++-
 6 files changed, 152 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index f401232..817355e 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -46,6 +46,7 @@ libprocess_la_SOURCES =		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\
+  src/socket.cpp		\
   src/subprocess.cpp		\
   src/synchronized.hpp		\
   src/timeseries.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7e1e3f2..436761b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -109,6 +109,20 @@ inline Try<Node> getsockname(int s, sa_family_t family)
 class Socket
 {
 public:
+  // Available kinds of implementations.
+  enum Kind {
+    POLL
+    // TODO(jmlvanre): Add libevent socket.
+  };
+
+  // Returns an instance of a Socket using the specified kind of
+  // implementation and potentially wrapping the specified file
+  // descriptor.
+  static Try<Socket> create(Kind kind = DEFAULT_KIND(), int s = -1);
+
+  // Returns the default kind of implementation of Socket.
+  static const Kind& DEFAULT_KIND();
+
   // Each socket is a reference counted, shared by default, concurrent
   // object. However, since we want to support multiple
   // implementations we use the Pimpl pattern (often called the
@@ -117,24 +131,21 @@ public:
   class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
-    Impl() : s(-1) {}
-
-    explicit Impl(int _s) : s(_s) {}
+    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
     ~Impl()
     {
-      if (s >= 0) {
-        Try<Nothing> close = os::close(s);
-        if (close.isError()) {
-          ABORT("Failed to close socket " +
-                stringify(s) + ": " + close.error());
-        }
+      CHECK(s >= 0);
+      Try<Nothing> close = os::close(s);
+      if (close.isError()) {
+        ABORT("Failed to close socket " +
+              stringify(s) + ": " + close.error());
       }
     }
 
     int get() const
     {
-      return s >= 0 ? s : create().get();
+      return s;
     }
 
     Future<Nothing> connect(const Node& node);
@@ -152,52 +163,9 @@ public:
     Future<Socket> accept();
 
   private:
-    const Impl& create() const
-    {
-      CHECK(s < 0);
-
-      // Supported in Linux >= 2.6.27.
-#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
-      Try<int> fd =
-        network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
-
-      if (fd.isError()) {
-        ABORT("Failed to create socket: " + fd.error());
-      }
-#else
-      Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
-      if (fd.isError()) {
-        ABORT("Failed to create socket: " + fd.error());
-      }
-
-      Try<Nothing> nonblock = os::nonblock(fd.get());
-      if (nonblock.isError()) {
-        ABORT("Failed to create socket, nonblock: " + nonblock.error());
-      }
-
-      Try<Nothing> cloexec = os::cloexec(fd.get());
-      if (cloexec.isError()) {
-        ABORT("Failed to create socket, cloexec: " + cloexec.error());
-      }
-#endif
-
-      s = fd.get();
-      return *this;
-    }
-
-    // Mutable so that the socket can be lazily created.
-    //
-    // TODO(benh): Create a factory for sockets and don't lazily
-    // create but instead return a Try<Socket> from the factory in
-    // order to eliminate the need for a mutable member or the call to
-    // ABORT above.
-    mutable int s;
+    int s;
   };
 
-  Socket() : impl(std::make_shared<Impl>()) {}
-
-  explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
-
   bool operator == (const Socket& that) const
   {
     return impl == that.impl;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5202750..09cd5a2 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -200,7 +200,11 @@ Future<Socket> accept(int fd)
       "Failed to turn off the Nagle algorithm: " + stringify(error));
   }
 
-  return Socket(s);
+  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+  if (socket.isError()) {
+    return Failure("Failed to accept, create socket: " + socket.error());
+  }
+  return socket.get();
 }
 
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 72c0ad4..25981ca 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -306,7 +306,7 @@ private:
   } links;
 
   // Collection of all actice sockets.
-  map<int, Socket> sockets;
+  map<int, Socket*> sockets;
 
   // Collection of sockets that should be disposed when they are
   // finished being used (e.g., when there is no more data to send on
@@ -452,7 +452,7 @@ static uint32_t __id__ = 0;
 static const int LISTEN_BACKLOG = 500000;
 
 // Local server socket.
-static Socket __s__;
+static Socket* __s__ = NULL;
 
 // Local node.
 static Node __node__;
@@ -721,7 +721,7 @@ void on_accept(const Future<Socket>& socket)
           decoder));
   }
 
-  __s__.accept()
+  __s__->accept()
     .onAny(lambda::bind(&on_accept, lambda::_1));
 }
 
@@ -853,14 +853,19 @@ void initialize(const string& delegate)
   }
 
   // Create a "server" socket for communicating with other nodes.
+  Try<Socket> create = Socket::create();
+  if (create.isError()) {
+    PLOG(FATAL) << "Failed to construct server socket:" << create.error();
+  }
+  __s__ = new Socket(create.get());
 
   // Allow address reuse.
   int on = 1;
-  if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+  if (setsockopt(__s__->get(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
     PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
   }
 
-  Try<Node> bind = __s__.bind(__node__);
+  Try<Node> bind = __s__->bind(__node__);
   if (bind.isError()) {
     PLOG(FATAL) << "Failed to initialize: " << bind.error();
   }
@@ -889,7 +894,7 @@ void initialize(const string& delegate)
     __node__.ip = ip.get();
   }
 
-  Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+  Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
   if (listen.isError()) {
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
@@ -898,7 +903,7 @@ void initialize(const string& delegate)
   // 'spawn' below for the garbage collector.
   initializing = false;
 
-  __s__.accept()
+  __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
 
   // TODO(benh): Make sure creating the garbage collector, logging
@@ -1232,7 +1237,7 @@ SocketManager::~SocketManager() {}
 void SocketManager::accepted(const Socket& socket)
 {
   synchronized (this) {
-    sockets[socket] = socket;
+    sockets[socket] = new Socket(socket);
   }
 }
 
@@ -1324,22 +1329,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
   CHECK(process != NULL);
 
-  Socket socket;
+  Option<Socket> socket = None();
   bool connect = false;
 
   synchronized (this) {
-    links.linkers[to].insert(process);
-    links.linkees[process].insert(to);
-    if (to.node != __node__) {
-      links.remotes[to.node].insert(to);
-    }
-
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      int s = socket;
+      Try<Socket> create = Socket::create();
+      if (create.isError()) {
+        VLOG(1) << "Failed to link, create socket: " << create.error();
+        return;
+      }
+      socket = create.get();
+      int s = socket.get().get();
 
-      sockets[s] = socket;
+      sockets[s] = new Socket(socket.get());
       nodes[s] = to.node;
 
       persists[to.node] = s;
@@ -1353,14 +1358,21 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       connect = true;
     }
+
+    links.linkers[to].insert(process);
+    links.linkees[process].insert(to);
+    if (to.node != __node__) {
+      links.remotes[to.node].insert(to);
+    }
   }
 
   if (connect) {
-    socket.connect(to.node)
+    CHECK_SOME(socket);
+    Socket(socket.get()).connect(to.node) // Copy to drop const.
       .onAny(lambda::bind(
           &internal::link_connect,
           lambda::_1,
-          new Socket(socket)));
+          new Socket(socket.get())));
   }
 }
 
@@ -1377,7 +1389,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
       if (proxies.count(socket) > 0) {
         return proxies[socket]->self();
       } else {
-        proxy = new HttpProxy(sockets[socket]);
+        proxy = new HttpProxy(*sockets[socket]);
         proxies[socket] = proxy;
       }
     }
@@ -1569,7 +1581,7 @@ void SocketManager::send(Message* message)
 
   const Node& node = message->to.node;
 
-  Socket socket;
+  Option<Socket> socket = None();
   bool connect = false;
 
   synchronized (this) {
@@ -1579,28 +1591,35 @@ void SocketManager::send(Message* message)
     if (persist || temp) {
       int s = persist ? persists[node] : temps[node];
       CHECK(sockets.count(s) > 0);
-      socket = sockets[s];
+      socket = *sockets[s];
 
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
       if (!persist) {
-        dispose.insert(socket);
+        dispose.insert(socket.get());
       }
 
-      if (outgoing.count(socket) > 0) {
-        outgoing[socket].push(new MessageEncoder(socket, message));
+      if (outgoing.count(socket.get()) > 0) {
+        outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
         return;
       } else {
         // Initialize the outgoing queue.
-        outgoing[socket];
+        outgoing[socket.get()];
       }
 
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      int s = socket;
+      Try<Socket> create = Socket::create();
+      if (create.isError()) {
+        VLOG(1) << "Failed to send, create socket: " << create.error();
+        delete message;
+        return;
+      }
+      socket = create.get();
+      int s = socket.get();
 
-      sockets[s] = socket;
+      sockets[s] = new Socket(socket.get());
       nodes[s] = node;
       temps[node] = s;
 
@@ -1614,16 +1633,19 @@ void SocketManager::send(Message* message)
   }
 
   if (connect) {
-    socket.connect(node)
+    CHECK_SOME(socket);
+    Socket(socket.get()).connect(node) // Copy to drop const.
       .onAny(lambda::bind(
           &internal::send_connect,
           lambda::_1,
-          new Socket(socket),
+          new Socket(socket.get()),
           message));
   } else {
     // If we're not connecting and we haven't added the encoder to
     // the 'outgoing' queue then schedule it to be sent.
-    internal::send(new MessageEncoder(socket, message), new Socket(socket));
+    internal::send(
+        new MessageEncoder(socket.get(), message),
+        new Socket(socket.get()));
   }
 }
 
@@ -1675,7 +1697,9 @@ Encoder* SocketManager::next(int s)
           }
 
           dispose.erase(s);
-          sockets.erase(s);
+          auto iterator = sockets.find(s);
+          delete iterator->second;
+          sockets.erase(iterator);
 
           // We don't actually close the socket (we wait for the Socket
           // abstraction to close it once there are no more references),
@@ -1752,7 +1776,9 @@ void SocketManager::close(int s)
       shutdown(s, SHUT_RD);
 
       dispose.erase(s);
-      sockets.erase(s);
+      auto iterator = sockets.find(s);
+      delete iterator->second;
+      sockets.erase(iterator);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
new file mode 100644
index 0000000..842a6ec
--- /dev/null
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -0,0 +1,56 @@
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+  // TODO(jmlvanre): Change the default based on configure or
+  // environment flags.
+  static const Kind DEFAULT = POLL;
+  return DEFAULT;
+}
+
+Try<Socket> Socket::create(Kind kind, int s)
+{
+  if (s < 0) {
+    // Supported in Linux >= 2.6.27.
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+    Try<int> fd =
+      network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+
+    if (fd.isError()) {
+      return Error("Failed to create socket: " + fd.error());
+    }
+#else
+    Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
+    if (fd.isError()) {
+      return Error("Failed to create socket: " + fd.error());
+    }
+
+    Try<Nothing> nonblock = os::nonblock(fd.get());
+    if (nonblock.isError()) {
+      return Error("Failed to create socket, nonblock: " + nonblock.error());
+    }
+
+    Try<Nothing> cloexec = os::cloexec(fd.get());
+    if (cloexec.isError()) {
+      return Error("Failed to create socket, cloexec: " + cloexec.error());
+    }
+#endif
+
+    s = fd.get();
+  }
+
+  switch (kind) {
+    case POLL: {
+      return Socket(std::make_shared<Socket::Impl>(s));
+    }
+    // By not setting a default we leverage the compiler errors when
+    // the enumeration is augmented to find all the cases we need to
+    // provide.
+  }
+}
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index ef52798..d65f5cf 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -19,7 +19,9 @@ using process::network::Socket;
 
 TEST(Decoder, Request)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json?key1=value1&key2=value2#fragment HTTP/1.1\r\n"
@@ -55,7 +57,9 @@ TEST(Decoder, Request)
 
 TEST(Decoder, RequestHeaderContinuation)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json HTTP/1.1\r\n"
@@ -79,7 +83,9 @@ TEST(Decoder, RequestHeaderContinuation)
 // This is expected to fail for now, see my TODO(bmahler) on http::Request.
 TEST(Decoder, DISABLED_RequestHeaderCaseInsensitive)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json HTTP/1.1\r\n"