You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:16 UTC
[05/15] mesos git commit: Used loop in PollSocketImpl.
Used loop in PollSocketImpl.
Review: https://reviews.apache.org/r/55318
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf9d1aeb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf9d1aeb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf9d1aeb
Branch: refs/heads/master
Commit: bf9d1aeb1212fe10dfa13a91bd64981494b4ab1d
Parents: 7ececb4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 7 22:31:37 2017 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/poll_socket.cpp | 174 +++++++++++++--------------
1 file changed, 83 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf9d1aeb/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 54e2c53..74acb69 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -18,6 +18,7 @@
#endif // __WINDOWS__
#include <process/io.hpp>
+#include <process/loop.hpp>
#include <process/network.hpp>
#include <process/socket.hpp>
@@ -172,113 +173,104 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
}
-namespace internal {
-
-Future<size_t> socket_send_data(
- const std::shared_ptr<PollSocketImpl>& impl,
- const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
{
- CHECK(size > 0);
+ CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
- while (true) {
- ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
+ // Need to hold a copy of `this` so that the underlying socket
+ // doesn't end up getting reused before we return.
+ auto self = shared(this);
+ // TODO(benh): Reuse `io::write`? Or is `net::send` and
+ // `MSG_NOSIGNAL` critical here?
+ return loop(
+ None(),
+ [self, data, size]() -> Future<Option<size_t>> {
+ while (true) {
+ ssize_t length = net::send(self->get(), data, size, MSG_NOSIGNAL);
+
+ if (length < 0) {
#ifdef __WINDOWS__
- int error = WSAGetLastError();
+ int error = WSAGetLastError();
#else
- int error = errno;
+ int error = errno;
#endif // __WINDOWS__
- if (length < 0 && net::is_restartable_error(error)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && net::is_retryable_error(error)) {
- // Might block, try again later.
- return io::poll(impl->get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, impl, data, size));
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const string error = os::strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- return Failure(ErrnoError("Socket send failed"));
- } else {
- VLOG(1) << "Socket closed while sending";
- return length;
- }
- } else {
- CHECK(length > 0);
+ if (net::is_restartable_error(error)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (!net::is_retryable_error(error)) {
+ // TODO(benh): Confirm that `os::strerror` does the
+ // right thing for `error` on Windows.
+ VLOG(1) << "Socket error while sending: " << os::strerror(error);
+ return Failure(os::strerror(error));
+ }
+
+ return None();
+ }
- return length;
- }
- }
+ return length;
+ }
+ },
+ [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+ // Retry after we've polled if we don't yet have a result.
+ if (length.isNone()) {
+ return io::poll(self->get(), io::WRITE)
+ .then([](short event) -> ControlFlow<size_t> {
+ CHECK_EQ(io::WRITE, event);
+ return Continue();
+ });
+ }
+ return Break(length.get());
+ });
}
-Future<size_t> socket_send_file(
- const std::shared_ptr<PollSocketImpl>& impl,
- int_fd fd,
- off_t offset,
- size_t size)
+Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size)
{
- CHECK(size > 0);
-
- while (true) {
- Try<ssize_t, SocketError> length =
- os::sendfile(impl->get(), fd, offset, size);
-
- if (length.isSome()) {
- CHECK(length.get() >= 0);
- if (length.get() == 0) {
- // Socket closed.
- VLOG(1) << "Socket closed while sending";
- }
- return length.get();
- }
-
- if (net::is_restartable_error(length.error().code)) {
- // Interrupted, try again now.
- continue;
- } else if (net::is_retryable_error(length.error().code)) {
- // Might block, try again later.
- return io::poll(impl->get(), io::WRITE)
- .then(lambda::bind(
- &internal::socket_send_file,
- impl,
- fd,
- offset,
- size));
- } else {
- // Socket error or closed.
- VLOG(1) << length.error().message;
- return Failure(length.error());
- };
- }
-}
-
-} // namespace internal {
+ CHECK(size > 0); // TODO(benh): Just return 0 if `size` is 0?
+ // Need to hold a copy of `this` so that the underlying socket
+ // doesn't end up getting reused before we return.
+ auto self = shared(this);
-Future<size_t> PollSocketImpl::send(const char* data, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(
- &internal::socket_send_data,
- shared(this),
- data,
- size));
-}
+ return loop(
+ None(),
+ [self, fd, offset, size]() -> Future<Option<size_t>> {
+ while (true) {
+ Try<ssize_t, SocketError> length = os::sendfile(
+ self->get(),
+ fd,
+ offset,
+ size);
+
+ if (length.isSome()) {
+ CHECK(length.get() >= 0);
+ return length.get();
+ }
+ if (net::is_restartable_error(length.error().code)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (!net::is_retryable_error(length.error().code)) {
+ VLOG(1) << length.error().message;
+ return Failure(length.error());
+ }
-Future<size_t> PollSocketImpl::sendfile(int_fd fd, off_t offset, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(
- &internal::socket_send_file,
- shared(this),
- fd,
- offset,
- size));
+ return None();
+ }
+ },
+ [self](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
+ // Retry after we've polled if we don't yet have a result.
+ if (length.isNone()) {
+ return io::poll(self->get(), io::WRITE)
+ .then([](short event) -> ControlFlow<size_t> {
+ CHECK_EQ(io::WRITE, event);
+ return Continue();
+ });
+ }
+ return Break(length.get());
+ });
}
} // namespace internal {