You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:15 UTC

[04/15] mesos git commit: Fixed bug where we fail to hold onto PollSocketImpl reference.

Fixed bug where we fail to hold onto PollSocketImpl reference.

Also inlined the lambdas to simplify the code.

Review: https://reviews.apache.org/r/55315


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/66bd7327
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/66bd7327
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/66bd7327

Branch: refs/heads/master
Commit: 66bd732752d229c1a3e52331c1b30ab83ef46352
Parents: 178f59f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 7 17:59:06 2017 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/poll_socket.cpp | 179 +++++++++++++--------------
 1 file changed, 86 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/66bd7327/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5410b07..54e2c53 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -49,121 +49,106 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
 }
 
 
-namespace internal {
-
-Future<int_fd> accept(int_fd fd)
+Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
 {
-  Try<int_fd> accepted = network::accept(fd);
-  if (accepted.isError()) {
-    return Failure(accepted.error());
-  }
-
-  int_fd s = accepted.get();
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return Failure("Failed to accept, nonblock: " + nonblock.error());
-  }
+  // Need to hold a copy of `this` so that the underlying socket
+  // doesn't end up getting reused before we return from the call to
+  // `io::poll` and end up accepting a socket incorrectly.
+  auto self = shared(this);
 
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return Failure("Failed to accept, cloexec: " + cloexec.error());
-  }
-
-  Try<Address> address = network::address(s);
-  if (address.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to get address: "
-                                << address.error();
-    os::close(s);
-    return Failure("Failed to get address: " + address.error());
-  }
+  return io::poll(get(), io::READ)
+    .then([self]() -> Future<std::shared_ptr<SocketImpl>> {
+      Try<int_fd> accepted = network::accept(self->get());
+      if (accepted.isError()) {
+        return Failure(accepted.error());
+      }
 
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  // NOTE: We cast to `char*` here because the function prototypes on Windows
-  // use `char*` instead of `void*`.
-  if (address->family() == Address::Family::INET4 ||
-      address->family() == Address::Family::INET6) {
-    int on = 1;
-    if (::setsockopt(
-            s,
-            SOL_TCP,
-            TCP_NODELAY,
-            reinterpret_cast<const char*>(&on),
-            sizeof(on)) < 0) {
-      const string error = os::strerror(errno);
-      VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-      os::close(s);
-      return Failure(
-          "Failed to turn off the Nagle algorithm: " + stringify(error));
-    }
-  }
+      int_fd s = accepted.get();
+      Try<Nothing> nonblock = os::nonblock(s);
+      if (nonblock.isError()) {
+        os::close(s);
+        return Failure("Failed to accept, nonblock: " + nonblock.error());
+      }
 
-  return s;
-}
+      Try<Nothing> cloexec = os::cloexec(s);
+      if (cloexec.isError()) {
+        os::close(s);
+        return Failure("Failed to accept, cloexec: " + cloexec.error());
+      }
 
-} // namespace internal {
+      Try<Address> address = network::address(s);
+      if (address.isError()) {
+        os::close(s);
+        return Failure("Failed to get address: " + address.error());
+      }
 
+      // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+      // NOTE: We cast to `char*` here because the function prototypes
+      // on Windows use `char*` instead of `void*`.
+      if (address->family() == Address::Family::INET4 ||
+          address->family() == Address::Family::INET6) {
+        int on = 1;
+        if (::setsockopt(
+                s,
+                SOL_TCP,
+                TCP_NODELAY,
+                reinterpret_cast<const char*>(&on),
+                sizeof(on)) < 0) {
+          const string error = os::strerror(errno);
+          os::close(s);
+          return Failure(
+              "Failed to turn off the Nagle algorithm: " + stringify(error));
+        }
+      }
 
-Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
-{
-  return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()))
-    .then([](int_fd s) -> Future<std::shared_ptr<SocketImpl>> {
       Try<std::shared_ptr<SocketImpl>> impl = create(s);
       if (impl.isError()) {
         os::close(s);
         return Failure("Failed to create socket: " + impl.error());
       }
+
       return impl.get();
     });
 }
 
 
-namespace internal {
-
-Future<Nothing> connect(
-    const std::shared_ptr<PollSocketImpl>& socket,
-    const Address& to)
-{
-  // Now check that a successful connection was made.
-  int opt;
-  socklen_t optlen = sizeof(opt);
-  int_fd s = socket->get();
-
-  // NOTE: We cast to `char*` here because the function prototypes on Windows
-  // use `char*` instead of `void*`.
-  if (::getsockopt(
-          s,
-          SOL_SOCKET,
-          SO_ERROR,
-          reinterpret_cast<char*>(&opt),
-          &optlen) < 0) {
-    return Failure(
-        SocketError("Failed to get status of connection to " + stringify(to)));
-  }
-
-  if (opt != 0) {
-    return Failure(SocketError(opt, "Failed to connect to " + stringify(to)));
-  }
-
-  return Nothing();
-}
-
-} // namespace internal {
-
-
 Future<Nothing> PollSocketImpl::connect(const Address& address)
 {
   Try<Nothing, SocketError> connect = network::connect(get(), address);
   if (connect.isError()) {
     if (net::is_inprogress_error(connect.error().code)) {
+      // Need to hold a copy of `this` so that the underlying socket
+      // doesn't end up getting reused before we return from the call
+      // to `io::poll` and end up connecting incorrectly.
+      auto self = shared(this);
+
       return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, shared(this), address));
+        .then([self, address]() -> Future<Nothing> {
+          // Now check that a successful connection was made.
+          int opt;
+          socklen_t optlen = sizeof(opt);
+
+          // NOTE: We cast to `char*` here because the function
+          // prototypes on Windows use `char*` instead of `void*`.
+          if (::getsockopt(
+                  self->get(),
+                  SOL_SOCKET,
+                  SO_ERROR,
+                  reinterpret_cast<char*>(&opt),
+                  &optlen) < 0) {
+            return Failure(SocketError(
+                "Failed to get status of connect to " + stringify(address)));
+          }
+
+          if (opt != 0) {
+            return Failure(SocketError(
+                opt,
+                "Failed to connect to " +
+                stringify(address)));
+          }
+
+          return Nothing();
+        });
     }
 
     return Failure(connect.error());
@@ -175,7 +160,15 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
 
 Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 {
-  return io::read(get(), data, size);
+  // Need to hold a copy of `this` so that the underlying socket
+  // doesn't end up getting reused before we return from the call to
+  // `io::read` and end up reading data incorrectly.
+  auto self = shared(this);
+
+  return io::read(get(), data, size)
+    .then([self](size_t length) {
+      return length;
+    });
 }