You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:16 UTC

[05/15] mesos git commit: Used loop in PollSocketImpl.

Used loop in PollSocketImpl.

Review: https://reviews.apache.org/r/55318


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf9d1aeb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf9d1aeb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf9d1aeb

Branch: refs/heads/master
Commit: bf9d1aeb1212fe10dfa13a91bd64981494b4ab1d
Parents: 7ececb4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 7 22:31:37 2017 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/poll_socket.cpp | 174 +++++++++++++--------------
 1 file changed, 83 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf9d1aeb/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 54e2c53..74acb69 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -18,6 +18,7 @@
 #endif // __WINDOWS__
 
 #include <process/io.hpp>
+#include <process/loop.hpp>
 #include <process/network.hpp>
 #include <process/socket.hpp>
 
@@ -172,113 +173,104 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 }
 
 
-namespace internal {
-
-Future<size_t> socket_send_data(
-    const std::shared_ptr<PollSocketImpl>& impl,
-    const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
 {
-  CHECK(size > 0);
+  CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
 
-  while (true) {
-    ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
+  // Need to hold a copy of `this` so that the underlying socket
+  // doesn't end up getting reused before we return.
+  auto self = shared(this);
 
+  // TODO(benh): Reuse `io::write`? Or is `net::send` and
+  // `MSG_NOSIGNAL` critical here?
+  return loop(
+      None(),
+      [self, data, size]() -> Future<Option<size_t>> {
+        while (true) {
+          ssize_t length = net::send(self->get(), data, size, MSG_NOSIGNAL);
+
+          if (length < 0) {
 #ifdef __WINDOWS__
-    int error = WSAGetLastError();
+            int error = WSAGetLastError();
 #else
-    int error = errno;
+            int error = errno;
 #endif // __WINDOWS__
 
-    if (length < 0 && net::is_restartable_error(error)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && net::is_retryable_error(error)) {
-      // Might block, try again later.
-      return io::poll(impl->get(), io::WRITE)
-        .then(lambda::bind(&internal::socket_send_data, impl, data, size));
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const string error = os::strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-        return Failure(ErrnoError("Socket send failed"));
-      } else {
-        VLOG(1) << "Socket closed while sending";
-        return length;
-      }
-    } else {
-      CHECK(length > 0);
+            if (net::is_restartable_error(error)) {
+              // Interrupted, try again now.
+              continue;
+            } else if (!net::is_retryable_error(error)) {
+              // TODO(benh): Confirm that `os::strerror` does the
+              // right thing for `error` on Windows.
+              VLOG(1) << "Socket error while sending: " << os::strerror(error);
+              return Failure(os::strerror(error));
+            }
+
+            return None();
+          }
 
-      return length;
-    }
-  }
+          return length;
+        }
+      },
+      [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+        // Retry after we've polled if we don't yet have a result.
+        if (length.isNone()) {
+          return io::poll(self->get(), io::WRITE)
+            .then([](short event) -> ControlFlow<size_t> {
+              CHECK_EQ(io::WRITE, event);
+              return Continue();
+            });
+        }
+        return Break(length.get());
+      });
 }
 
 
-Future<size_t> socket_send_file(
-    const std::shared_ptr<PollSocketImpl>& impl,
-    int_fd fd,
-    off_t offset,
-    size_t size)
+Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size)
 {
-  CHECK(size > 0);
-
-  while (true) {
-    Try<ssize_t, SocketError> length =
-      os::sendfile(impl->get(), fd, offset, size);
-
-    if (length.isSome()) {
-      CHECK(length.get() >= 0);
-      if (length.get() == 0) {
-        // Socket closed.
-        VLOG(1) << "Socket closed while sending";
-      }
-      return length.get();
-    }
-
-    if (net::is_restartable_error(length.error().code)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (net::is_retryable_error(length.error().code)) {
-      // Might block, try again later.
-      return io::poll(impl->get(), io::WRITE)
-        .then(lambda::bind(
-            &internal::socket_send_file,
-            impl,
-            fd,
-            offset,
-            size));
-    } else {
-      // Socket error or closed.
-      VLOG(1) << length.error().message;
-      return Failure(length.error());
-    };
-  }
-}
-
-} // namespace internal {
+  CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
 
+  // Need to hold a copy of `this` so that the underlying socket
+  // doesn't end up getting reused before we return.
+  auto self = shared(this);
 
-Future<size_t> PollSocketImpl::send(const char* data, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(
-        &internal::socket_send_data,
-        shared(this),
-        data,
-        size));
-}
+  return loop(
+      None(),
+      [self, fd, offset, size]() -> Future<Option<size_t>> {
+        while (true) {
+          Try<ssize_t, SocketError> length = os::sendfile(
+              self->get(),
+              fd,
+              offset,
+              size);
+
+          if (length.isSome()) {
+            CHECK(length.get() >= 0);
+            return length.get();
+          }
 
+          if (net::is_restartable_error(length.error().code)) {
+            // Interrupted, try again now.
+            continue;
+          } else if (!net::is_retryable_error(length.error().code)) {
+            VLOG(1) << length.error().message;
+            return Failure(length.error());
+          }
 
-Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(
-        &internal::socket_send_file,
-        shared(this),
-        fd,
-        offset,
-        size));
+          return None();
+        }
+      },
+      [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+        // Retry after we've polled if we don't yet have a result.
+        if (length.isNone()) {
+          return io::poll(self->get(), io::WRITE)
+            .then([](short event) -> ControlFlow<size_t> {
+              CHECK_EQ(io::WRITE, event);
+              return Continue();
+            });
+        }
+        return Break(length.get());
+      });
 }
 
 } // namespace internal {