You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:03 UTC

[12/12] mesos git commit: Move poll based socket implementation into poll_socket.cpp.

Move poll based socket implementation into poll_socket.cpp.

Review: https://reviews.apache.org/r/28467


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62ad3f27
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62ad3f27
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62ad3f27

Branch: refs/heads/master
Commit: 62ad3f27c44dfbad8cb68f952ed2d076e349fd6a
Parents: f4a4180
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:44:46 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am         |   1 +
 3rdparty/libprocess/src/poll_socket.cpp | 216 +++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp     | 203 -------------------------
 3 files changed, 217 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 09fce46..f401232 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES =		\
   src/latch.cpp			\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
+  src/poll_socket.cpp		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
new file mode 100644
index 0000000..5202750
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -0,0 +1,216 @@
+#include <netinet/tcp.h>
+
+#include <process/io.hpp>
+#include <process/socket.hpp>
+
+#include "config.hpp"
+
+using std::string;
+
+namespace process {
+namespace network {
+
+namespace internal {
+
+Future<Nothing> connect(const Socket& socket)
+{
+  // Now check that a successful connection was made.
+  int opt;
+  socklen_t optlen = sizeof(opt);
+  int s = socket.get();
+
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+    // Connect failure.
+    VLOG(1) << "Socket error while connecting";
+    return Failure("Socket error while connecting");
+  }
+
+  return Nothing();
+}
+
+} // namespace internal {
+
+
+Future<Nothing> Socket::Impl::connect(const Node& node)
+{
+  Try<int> connect = network::connect(get(), node);
+  if (connect.isError()) {
+    if (errno == EINPROGRESS) {
+      return io::poll(get(), io::WRITE)
+        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+    }
+
+    return Failure(connect.error());
+  }
+
+  return Nothing();
+}
+
+
+Future<size_t> Socket::Impl::recv(char* data, size_t size)
+{
+  return io::read(get(), data, size);
+}
+
+
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, s, data, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket send failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = os::sendfile(s, fd, offset, size);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket sendfile failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  Try<int> bind = network::bind(get(), node);
+  if (bind.isError()) {
+    return Error(bind.error());
+  }
+
+  // Lookup and store assigned ip and assigned port.
+  return network::getsockname(get(), AF_INET);
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  Try<int> accepted = network::accept(fd, AF_INET);
+  if (accepted.isError()) {
+    return Failure(accepted.error());
+  }
+
+  int s = accepted.get();
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 441ce48..72c0ad4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1220,209 +1220,6 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
 }
 
 
-namespace internal {
-
-Future<Nothing> connect(const Socket& socket)
-{
-  // Now check that a successful connection was made.
-  int opt;
-  socklen_t optlen = sizeof(opt);
-  int s = socket.get();
-
-  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
-    // Connect failure.
-    VLOG(1) << "Socket error while connecting";
-    return Failure("Socket error while connecting");
-  }
-
-  return Nothing();
-}
-
-} // namespace internal {
-
-
-Future<Nothing> Socket::Impl::connect(const Node& node)
-{
-  Try<int> connect = network::connect(get(), node);
-  if (connect.isError()) {
-    if (errno == EINPROGRESS) {
-      return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
-    }
-
-    return Failure(connect.error());
-  }
-
-  return Nothing();
-}
-
-
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
-{
-  return io::read(get(), data, size);
-}
-
-
-namespace internal {
-
-Future<size_t> socket_send_data(int s, const char* data, size_t size)
-{
-  CHECK(size > 0);
-
-  while (true) {
-    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      return io::poll(s, io::WRITE)
-        .then(lambda::bind(&internal::socket_send_data, s, data, size));
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      if (length == 0) {
-        return length;
-      } else {
-        return Failure(ErrnoError("Socket send failed"));
-      }
-    } else {
-      CHECK(length > 0);
-
-      return length;
-    }
-  }
-}
-
-
-Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
-{
-  CHECK(size > 0);
-
-  while (true) {
-    ssize_t length = os::sendfile(s, fd, offset, size);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      return io::poll(s, io::WRITE)
-        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      if (length == 0) {
-        return length;
-      } else {
-        return Failure(ErrnoError("Socket sendfile failed"));
-      }
-    } else {
-      CHECK(length > 0);
-
-      return length;
-    }
-  }
-}
-
-} // namespace internal {
-
-
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
-}
-
-
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
-}
-
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
-  Try<int> bind = network::bind(get(), node);
-  if (bind.isError()) {
-    return Error(bind.error());
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
-  if (::listen(get(), backlog) < 0) {
-    return ErrnoError();
-  }
-  return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
-  Try<int> accepted = network::accept(fd, AF_INET);
-  if (accepted.isError()) {
-    return Failure(accepted.error());
-  }
-
-  int s = accepted.get();
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return Failure("Failed to accept, nonblock: " + nonblock.error());
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return Failure("Failed to accept, cloexec: " + cloexec.error());
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-    return Failure(
-      "Failed to turn off the Nagle algorithm: " + stringify(error));
-  }
-
-  return Socket(s);
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
-  return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()));
-}
-
-
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;