You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:15 UTC
[04/15] mesos git commit: Fixed bug where we fail to hold onto
PollSocketImpl reference.
Fixed bug where we fail to hold onto PollSocketImpl reference.
Also inlined the lambdas to simplify the code.
Review: https://reviews.apache.org/r/55315
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/66bd7327
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/66bd7327
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/66bd7327
Branch: refs/heads/master
Commit: 66bd732752d229c1a3e52331c1b30ab83ef46352
Parents: 178f59f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 7 17:59:06 2017 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/poll_socket.cpp | 179 +++++++++++++--------------
1 file changed, 86 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/66bd7327/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5410b07..54e2c53 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -49,121 +49,106 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
}
-namespace internal {
-
-Future<int_fd> accept(int_fd fd)
+Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
{
- Try<int_fd> accepted = network::accept(fd);
- if (accepted.isError()) {
- return Failure(accepted.error());
- }
-
- int_fd s = accepted.get();
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return Failure("Failed to accept, nonblock: " + nonblock.error());
- }
+ // Need to hold a copy of `this` so that the underlying socket
+ // doesn't end up getting reused before we return from the call to
+ // `io::poll` and end up accepting a socket incorrectly.
+ auto self = shared(this);
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return Failure("Failed to accept, cloexec: " + cloexec.error());
- }
-
- Try<Address> address = network::address(s);
- if (address.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to get address: "
- << address.error();
- os::close(s);
- return Failure("Failed to get address: " + address.error());
- }
+ return io::poll(get(), io::READ)
+ .then([self]() -> Future<std::shared_ptr<SocketImpl>> {
+ Try<int_fd> accepted = network::accept(self->get());
+ if (accepted.isError()) {
+ return Failure(accepted.error());
+ }
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- // NOTE: We cast to `char*` here because the function prototypes on Windows
- // use `char*` instead of `void*`.
- if (address->family() == Address::Family::INET4 ||
- address->family() == Address::Family::INET6) {
- int on = 1;
- if (::setsockopt(
- s,
- SOL_TCP,
- TCP_NODELAY,
- reinterpret_cast<const char*>(&on),
- sizeof(on)) < 0) {
- const string error = os::strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
- }
- }
+ int_fd s = accepted.get();
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
- return s;
-}
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
-} // namespace internal {
+ Try<Address> address = network::address(s);
+ if (address.isError()) {
+ os::close(s);
+ return Failure("Failed to get address: " + address.error());
+ }
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ // NOTE: We cast to `char*` here because the function prototypes
+ // on Windows use `char*` instead of `void*`.
+ if (address->family() == Address::Family::INET4 ||
+ address->family() == Address::Family::INET6) {
+ int on = 1;
+ if (::setsockopt(
+ s,
+ SOL_TCP,
+ TCP_NODELAY,
+ reinterpret_cast<const char*>(&on),
+ sizeof(on)) < 0) {
+ const string error = os::strerror(errno);
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+ }
-Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
-{
- return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()))
- .then([](int_fd s) -> Future<std::shared_ptr<SocketImpl>> {
Try<std::shared_ptr<SocketImpl>> impl = create(s);
if (impl.isError()) {
os::close(s);
return Failure("Failed to create socket: " + impl.error());
}
+
return impl.get();
});
}
-namespace internal {
-
-Future<Nothing> connect(
- const std::shared_ptr<PollSocketImpl>& socket,
- const Address& to)
-{
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
- int_fd s = socket->get();
-
- // NOTE: We cast to `char*` here because the function prototypes on Windows
- // use `char*` instead of `void*`.
- if (::getsockopt(
- s,
- SOL_SOCKET,
- SO_ERROR,
- reinterpret_cast<char*>(&opt),
- &optlen) < 0) {
- return Failure(
- SocketError("Failed to get status of connection to " + stringify(to)));
- }
-
- if (opt != 0) {
- return Failure(SocketError(opt, "Failed to connect to " + stringify(to)));
- }
-
- return Nothing();
-}
-
-} // namespace internal {
-
-
Future<Nothing> PollSocketImpl::connect(const Address& address)
{
Try<Nothing, SocketError> connect = network::connect(get(), address);
if (connect.isError()) {
if (net::is_inprogress_error(connect.error().code)) {
+ // Need to hold a copy of `this` so that the underlying socket
+ // doesn't end up getting reused before we return from the call
+ // to `io::poll` and end up connecting incorrectly.
+ auto self = shared(this);
+
return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, shared(this), address));
+ .then([self, address]() -> Future<Nothing> {
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+
+ // NOTE: We cast to `char*` here because the function
+ // prototypes on Windows use `char*` instead of `void*`.
+ if (::getsockopt(
+ self->get(),
+ SOL_SOCKET,
+ SO_ERROR,
+ reinterpret_cast<char*>(&opt),
+ &optlen) < 0) {
+ return Failure(SocketError(
+ "Failed to get status of connect to " + stringify(address)));
+ }
+
+ if (opt != 0) {
+ return Failure(SocketError(
+ opt,
+ "Failed to connect to " +
+ stringify(address)));
+ }
+
+ return Nothing();
+ });
}
return Failure(connect.error());
@@ -175,7 +160,15 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
Future<size_t> PollSocketImpl::recv(char* data, size_t size)
{
- return io::read(get(), data, size);
+ // Need to hold a copy of `this` so that the underlying socket
+ // doesn't end up getting reused before we return from the call to
+ // `io::read` and end up reading data incorrectly.
+ auto self = shared(this);
+
+ return io::read(get(), data, size)
+ .then([self](size_t length) {
+ return length;
+ });
}