You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:23 UTC

[26/30] mesos git commit: Add bind(), listen(), and accept() to Socket interface.

Add bind(), listen(), and accept() to Socket interface.

Review: https://reviews.apache.org/r/27966


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8279b45e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8279b45e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8279b45e

Branch: refs/heads/master
Commit: 8279b45ed325375bbf8cc5919fd7009c7d2335cf
Parents: 8edab65
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:55:07 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  21 ++
 3rdparty/libprocess/src/libev.cpp              |   1 -
 3rdparty/libprocess/src/libev.hpp              |   3 -
 3rdparty/libprocess/src/process.cpp            | 235 +++++++++++---------
 4 files changed, 153 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 5fd8d1b..c022924 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -82,6 +82,12 @@ public:
 
     Future<size_t> sendfile(int fd, off_t offset, size_t size);
 
+    Try<Node> bind(const Node& node);
+
+    Try<Nothing> listen(int backlog);
+
+    Future<Socket> accept();
+
   private:
     const Impl& create() const
     {
@@ -143,6 +149,21 @@ public:
     return impl->sendfile(fd, offset, size);
   }
 
+  Try<Node> bind(const Node& node)
+  {
+    return impl->bind(node);
+  }
+
+  Try<Nothing> listen(int backlog)
+  {
+    return impl->listen(backlog);
+  }
+
+  Future<Socket> accept()
+  {
+    return impl->accept();
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index efc89d8..6560050 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -12,7 +12,6 @@ namespace process {
 // libev.hpp (since these need to live in the static data space).
 struct ev_loop* loop = NULL;
 ev_async async_watcher;
-ev_io server_watcher;
 std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
 synchronizable(watchers);
 std::queue<lambda::function<void(void)>>* functions =

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index bac8b6a..04847e3 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -18,9 +18,6 @@ extern struct ev_loop* loop;
 // with IO watchers and functions (via run_in_event_loop).
 extern ev_async async_watcher;
 
-// Server watcher for accepting connections.
-extern ev_io server_watcher;
-
 // Queue of I/O watchers to be asynchronously added to the event loop
 // (protected by 'watchers' below).
 // TODO(benh): Replace this queue with functions that we put in

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9f91020..5d3b947 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -267,7 +267,7 @@ public:
   SocketManager();
   ~SocketManager();
 
-  Socket accepted(int s);
+  void accepted(const Socket& socket);
 
   void link(ProcessBase* process, const UPID& to);
 
@@ -433,8 +433,11 @@ const string Profiler::STOP_HELP = HELP(
 // Unique id that can be assigned to each process.
 static uint32_t __id__ = 0;
 
+// Server socket listen backlog.
+static const int LISTEN_BACKLOG = 500000;
+
 // Local server socket.
-static int __s__ = -1;
+static Socket __s__;
 
 // Local node.
 static Node __node__;
@@ -640,66 +643,6 @@ void decode_read(
 } // namespace internal {
 
 
-void accept(struct ev_loop* loop, ev_io* watcher, int revents)
-{
-  CHECK_EQ(__s__, watcher->fd);
-
-  sockaddr_in addr;
-  socklen_t addrlen = sizeof(addr);
-
-  int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
-
-  if (s < 0) {
-    return;
-  }
-
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return;
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return;
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-  } else {
-    // Inform the socket manager for proper bookkeeping.
-    Socket socket = socket_manager->accepted(s);
-
-    // Allocate a buffer to read into. This can be replaced later
-    // when socket supports a read function that provides the
-    // buffered data in the resulting callback.
-    const size_t size = 80 * 1024;
-    char* data = new char[size];
-    memset(data, 0, size);
-
-    DataDecoder* decoder = new DataDecoder(socket);
-
-    socket.read(data, size)
-      .onAny(lambda::bind(
-          &internal::decode_read,
-          lambda::_1,
-          data,
-          size,
-          new Socket(socket),
-          decoder));
-  }
-}
-
-
 void* serve(void* arg)
 {
   ev_loop(((struct ev_loop*) arg), 0);
@@ -765,6 +708,37 @@ void timedout(list<Timer>&& timers)
 // }
 
 
+namespace internal {
+
+void on_accept(const Future<Socket>& socket)
+{
+  if (socket.isReady()) {
+    // Inform the socket manager for proper bookkeeping.
+    socket_manager->accepted(socket.get());
+
+    const size_t size = 80 * 1024;
+    char* data = new char[size];
+    memset(data, 0, size);
+
+    DataDecoder* decoder = new DataDecoder(socket.get());
+
+    socket.get().read(data, size)
+      .onAny(lambda::bind(
+          &internal::decode_read,
+          lambda::_1,
+          data,
+          size,
+          new Socket(socket.get()),
+          decoder));
+  }
+
+  __s__.accept()
+    .onAny(lambda::bind(&on_accept, lambda::_1));
+}
+
+} // namespace internal {
+
+
 void initialize(const string& delegate)
 {
   // TODO(benh): Return an error if attempting to initialize again
@@ -866,21 +840,6 @@ void initialize(const string& delegate)
   }
 
   // Create a "server" socket for communicating with other nodes.
-  if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    PLOG(FATAL) << "Failed to initialize, socket";
-  }
-
-  // Make socket non-blocking.
-  Try<Nothing> nonblock = os::nonblock(__s__);
-  if (nonblock.isError()) {
-    LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
-  }
-
-  // Set FD_CLOEXEC flag.
-  Try<Nothing> cloexec = os::cloexec(__s__);
-  if (cloexec.isError()) {
-    LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
-  }
 
   // Allow address reuse.
   int on = 1;
@@ -888,25 +847,12 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
   }
 
-  // Set up socket.
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_addr.s_addr = __node__.ip;
-  addr.sin_port = htons(__node__.port);
-
-  if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
-    PLOG(FATAL) << "Failed to initialize, bind " << __node__;
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  socklen_t addrlen = sizeof(addr);
-  if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
-    PLOG(FATAL) << "Failed to initialize, getsockname";
+  Try<Node> bind = __s__.bind(__node__);
+  if (bind.isError()) {
+    PLOG(FATAL) << "Failed to initialize: " << bind.error();
   }
 
-  __node__.ip = addr.sin_addr.s_addr;
-  __node__.port = ntohs(addr.sin_port);
+  __node__ = bind.get();
 
   // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
   // actually have a valid external ip address. Note that we need only
@@ -931,8 +877,9 @@ void initialize(const string& delegate)
     __node__.ip = *((uint32_t *) he->h_addr_list[0]);
   }
 
-  if (listen(__s__, 500000) < 0) {
-    PLOG(FATAL) << "Failed to initialize, listen";
+  Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+  if (listen.isError()) {
+    PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
   // Initialize libev.
@@ -950,9 +897,6 @@ void initialize(const string& delegate)
   ev_async_init(&async_watcher, handle_async);
   ev_async_start(loop, &async_watcher);
 
-  ev_io_init(&server_watcher, accept, __s__, EV_READ);
-  ev_io_start(loop, &server_watcher);
-
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -979,6 +923,9 @@ void initialize(const string& delegate)
   // 'spawn' below for the garbage collector.
   initializing = false;
 
+  __s__.accept()
+    .onAny(lambda::bind(&internal::on_accept, lambda::_1));
+
   // TODO(benh): Make sure creating the garbage collector, logging
   // process, and profiler always succeeds and use supervisors to make
   // sure that none terminate.
@@ -1436,6 +1383,90 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
 }
 
 
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_addr.s_addr = node.ip;
+  addr.sin_port = htons(node.port);
+
+  if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+    return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) +
+                 ":" + stringify(node.port));
+  }
+
+  // Lookup and store assigned ip and assigned port.
+  socklen_t addrlen = sizeof(addr);
+  if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) {
+    return ErrnoError("Failed to bind, getsockname");
+  }
+
+  return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  sockaddr_in addr;
+  socklen_t addrlen = sizeof(addr);
+
+  int s = ::accept(fd, (sockaddr*) &addr, &addrlen);
+
+  if (s < 0) {
+    return Failure(ErrnoError("Failed to accept"));
+  }
+
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1445,13 +1476,11 @@ SocketManager::SocketManager()
 SocketManager::~SocketManager() {}
 
 
-Socket SocketManager::accepted(int s)
+void SocketManager::accepted(const Socket& socket)
 {
   synchronized (this) {
-    return sockets[s] = Socket(s);
+    sockets[socket] = socket;
   }
-
-  UNREACHABLE();
 }