You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:02 UTC

[11/12] mesos git commit: Virtualize Socket::Impl interface.

Virtualize Socket::Impl interface.

Review: https://reviews.apache.org/r/28671


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d46e853
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d46e853
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d46e853

Branch: refs/heads/master
Commit: 2d46e853321469bcd7e2d3727254205c19809b7f
Parents: b3bb74c
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Dec 25 10:55:06 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                |   1 +
 3rdparty/libprocess/include/process/socket.hpp |  62 ++++----
 3rdparty/libprocess/src/poll_socket.cpp        | 153 ++++++++++----------
 3rdparty/libprocess/src/poll_socket.hpp        |  29 ++++
 3rdparty/libprocess/src/socket.cpp             |  37 +++--
 5 files changed, 163 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 817355e..6ab9cb8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -43,6 +43,7 @@ libprocess_la_SOURCES =		\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
   src/poll_socket.cpp		\
+  src/poll_socket.hpp		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 436761b..ddb9e36 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -111,8 +111,8 @@ class Socket
 public:
   // Available kinds of implementations.
   enum Kind {
-    POLL
-    // TODO(jmlvanre): Add libevent socket.
+    POLL,
+    // TODO(jmlvanre): Add libevent SSL socket.
   };
 
   // Returns an instance of a Socket using the specified kind of
@@ -131,9 +131,7 @@ public:
   class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
-    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
-    ~Impl()
+    virtual ~Impl()
     {
       CHECK(s >= 0);
       Try<Nothing> close = os::close(s);
@@ -148,21 +146,21 @@ public:
       return s;
     }
 
-    Future<Nothing> connect(const Node& node);
-
-    Future<size_t> recv(char* data, size_t size);
-
-    Future<size_t> send(const char* data, size_t size);
-
-    Future<size_t> sendfile(int fd, off_t offset, size_t size);
+    // Socket::Impl interface.
+    virtual Try<Node> bind(const Node& node);
+    virtual Try<Nothing> listen(int backlog) = 0;
+    virtual Future<Socket> accept() = 0;
+    virtual Future<Nothing> connect(const Node& node) = 0;
+    virtual Future<size_t> recv(char* data, size_t size) = 0;
+    virtual Future<size_t> send(const char* data, size_t size) = 0;
+    virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
 
-    Try<Node> bind(const Node& node);
-
-    Try<Nothing> listen(int backlog);
+  protected:
+    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
-    Future<Socket> accept();
+    // Construct a Socket wrapper from this implementation.
+    Socket socket() { return Socket(shared_from_this()); }
 
-  private:
     int s;
   };
 
@@ -181,44 +179,46 @@ public:
     return impl->get();
   }
 
-  Future<Nothing> connect(const Node& node)
+  Try<Node> bind(const Node& node)
   {
-    return impl->connect(node);
+    return impl->bind(node);
   }
 
-  Future<size_t> recv(char* data, size_t size) const
+  Try<Nothing> listen(int backlog)
   {
-    return impl->recv(data, size);
+    return impl->listen(backlog);
   }
 
-  Future<size_t> send(const char* data, size_t size) const
+  Future<Socket> accept()
   {
-    return impl->send(data, size);
+    return impl->accept();
   }
 
-  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+  Future<Nothing> connect(const Node& node)
   {
-    return impl->sendfile(fd, offset, size);
+    return impl->connect(node);
   }
 
-  Try<Node> bind(const Node& node)
+  Future<size_t> recv(char* data, size_t size) const
   {
-    return impl->bind(node);
+    return impl->recv(data, size);
   }
 
-  Try<Nothing> listen(int backlog)
+  Future<size_t> send(const char* data, size_t size) const
   {
-    return impl->listen(backlog);
+    return impl->send(data, size);
   }
 
-  Future<Socket> accept()
+  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
   {
-    return impl->accept();
+    return impl->sendfile(fd, offset, size);
   }
 
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 
+  explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+
   std::shared_ptr<Impl> impl;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 09cd5a2..2e70c6c 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -4,12 +4,81 @@
 #include <process/socket.hpp>
 
 #include "config.hpp"
+#include "poll_socket.hpp"
 
 using std::string;
 
 namespace process {
 namespace network {
 
+Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+{
+  return std::make_shared<PollSocketImpl>(s);
+}
+
+
+Try<Nothing> PollSocketImpl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  Try<int> accepted = network::accept(fd, AF_INET);
+  if (accepted.isError()) {
+    return Failure(accepted.error());
+  }
+
+  int s = accepted.get();
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+  if (socket.isError()) {
+    return Failure("Failed to accept, create socket: " + socket.error());
+  }
+  return socket.get();
+}
+
+} // namespace internal {
+
+
+Future<Socket> PollSocketImpl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+
 namespace internal {
 
 Future<Nothing> connect(const Socket& socket)
@@ -31,13 +100,13 @@ Future<Nothing> connect(const Socket& socket)
 } // namespace internal {
 
 
-Future<Nothing> Socket::Impl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Node& node)
 {
   Try<int> connect = network::connect(get(), node);
   if (connect.isError()) {
     if (errno == EINPROGRESS) {
       return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+        .then(lambda::bind(&internal::connect, socket()));
     }
 
     return Failure(connect.error());
@@ -47,7 +116,7 @@ Future<Nothing> Socket::Impl::connect(const Node& node)
 }
 
 
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
+Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 {
   return io::read(get(), data, size);
 }
@@ -129,92 +198,18 @@ Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
 } // namespace internal {
 
 
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
 {
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(&internal::socket_send_data, get(), data, size));
 }
 
 
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
 {
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
 }
 
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
-  Try<int> bind = network::bind(get(), node);
-  if (bind.isError()) {
-    return Error(bind.error());
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
-  if (::listen(get(), backlog) < 0) {
-    return ErrnoError();
-  }
-  return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
-  Try<int> accepted = network::accept(fd, AF_INET);
-  if (accepted.isError()) {
-    return Failure(accepted.error());
-  }
-
-  int s = accepted.get();
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return Failure("Failed to accept, nonblock: " + nonblock.error());
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return Failure("Failed to accept, cloexec: " + cloexec.error());
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-    return Failure(
-      "Failed to turn off the Nagle algorithm: " + stringify(error));
-  }
-
-  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
-  if (socket.isError()) {
-    return Failure("Failed to accept, create socket: " + socket.error());
-  }
-  return socket.get();
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
-  return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()));
-}
-
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
new file mode 100644
index 0000000..f7ca08e
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -0,0 +1,29 @@
+#include <memory>
+
+#include <process/socket.hpp>
+
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+
+class PollSocketImpl : public Socket::Impl
+{
+public:
+  static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+  PollSocketImpl(int s) : Socket::Impl(s) {}
+
+  virtual ~PollSocketImpl() {}
+
+  // Implementation of the Socket::Impl interface.
+  virtual Try<Nothing> listen(int backlog);
+  virtual Future<Socket> accept();
+  virtual Future<Nothing> connect(const Node& node);
+  virtual Future<size_t> recv(char* data, size_t size);
+  virtual Future<size_t> send(const char* data, size_t size);
+  virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+};
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 842a6ec..4b0f6be 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,16 +1,10 @@
 #include <process/socket.hpp>
 
+#include "poll_socket.hpp"
+
 namespace process {
 namespace network {
 
-const Socket::Kind& Socket::DEFAULT_KIND()
-{
-  // TODO(jmlvanre): Change the default based on configure or
-  // environment flags.
-  static const Kind DEFAULT = POLL;
-  return DEFAULT;
-}
-
 Try<Socket> Socket::create(Kind kind, int s)
 {
   if (s < 0) {
@@ -44,7 +38,11 @@ Try<Socket> Socket::create(Kind kind, int s)
 
   switch (kind) {
     case POLL: {
-      return Socket(std::make_shared<Socket::Impl>(s));
+      Try<std::shared_ptr<Socket::Impl>> socket = PollSocketImpl::create(s);
+      if (socket.isError()) {
+        return Error(socket.error());
+      }
+      return Socket(socket.get());
     }
     // By not setting a default we leverage the compiler errors when
     // the enumeration is augmented to find all the cases we need to
@@ -52,5 +50,26 @@ Try<Socket> Socket::create(Kind kind, int s)
   }
 }
 
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+  // TODO(jmlvanre): Change the default based on configure or
+  // environment flags.
+  static const Kind DEFAULT = POLL;
+  return DEFAULT;
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  Try<int> bind = network::bind(get(), node);
+  if (bind.isError()) {
+    return Error(bind.error());
+  }
+
+  // Lookup and store assigned IP and assigned port.
+  return network::getsockname(get(), AF_INET);
+}
+
 } // namespace network {
 } // namespace process {