You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:14 UTC
[03/15] mesos git commit: Used loop in SocketImpl.
Used loop in SocketImpl.
Review: https://reviews.apache.org/r/55317
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ececb45
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ececb45
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ececb45
Branch: refs/heads/master
Commit: 7ececb45ed1bba315f1afb65dce463b542e203d1
Parents: 66bd732
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Dec 18 18:19:01 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/socket.cpp | 146 +++++++++++++-------------------
1 file changed, 61 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ececb45/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 457c5ff..490bbd7 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -15,6 +15,7 @@
#include <boost/shared_array.hpp>
+#include <process/loop.hpp>
#include <process/network.hpp>
#include <process/owned.hpp>
#include <process/socket.hpp>
@@ -140,105 +141,80 @@ Try<Address> SocketImpl::bind(const Address& address)
}
-static Future<string> _recv(
- const std::shared_ptr<SocketImpl>& impl,
- const Option<ssize_t>& size,
- Owned<string> buffer,
- size_t chunk,
- boost::shared_array<char> data,
- size_t length)
-{
- if (length == 0) { // EOF.
- // Return everything we've received thus far, a subsequent receive
- // will return an empty string.
- return string(*buffer);
- }
-
- buffer->append(data.get(), length);
-
- if (size.isNone()) {
- // We've been asked just to return any data that we receive!
- return string(*buffer);
- } else if (size.get() < 0) {
- // We've been asked to receive until EOF so keep receiving since
- // according to the 'length == 0' check above we haven't reached
- // EOF yet.
- return impl->recv(data.get(), chunk)
- .then(lambda::bind(&_recv,
- impl,
- size,
- buffer,
- chunk,
- data,
- lambda::_1));
- } else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
- // We've been asked to receive a particular amount of data and we
- // haven't yet received that much data so keep receiving.
- return impl->recv(data.get(), size.get() - buffer->size())
- .then(lambda::bind(&_recv,
- impl,
- size,
- buffer,
- chunk,
- data,
- lambda::_1));
- }
-
- // We've received as much data as requested, so return that data!
- return string(*buffer);
-}
-
-
Future<string> SocketImpl::recv(const Option<ssize_t>& size)
{
+ // Extend lifetime by holding onto a reference to ourself!
+ auto self = shared_from_this();
+
// Default chunk size to attempt to receive when nothing is
// specified represents roughly 16 pages.
static const size_t DEFAULT_CHUNK = 16 * os::pagesize();
- size_t chunk = (size.isNone() || size.get() < 0)
+ const size_t chunk = (size.isNone() || size.get() < 0)
? DEFAULT_CHUNK
: size.get();
- Owned<string> buffer(new string());
boost::shared_array<char> data(new char[chunk]);
-
- return recv(data.get(), chunk)
- .then(lambda::bind(&_recv,
- shared_from_this(),
- size,
- buffer,
- chunk,
- data,
- lambda::_1));
+ string buffer;
+
+ return loop(
+ None(),
+ [=]() {
+ return self->recv(data.get(), chunk);
+ },
+ [=](size_t length) -> ControlFlow<string> {
+ if (length == 0) { // EOF.
+ // Return everything we've received thus far, a subsequent
+ // receive will return an empty string.
+ return Break(std::move(buffer));
+ }
+
+ buffer.append(data.get(), length);
+
+ if (size.isNone()) {
+ // We've been asked just to return any data that we receive!
+ return Break(std::move(buffer));
+ } else if (size.get() < 0) {
+ // We've been asked to receive until EOF so keep receiving
+ // since according to the 'length == 0' check above we
+ // haven't reached EOF yet.
+ return Continue();
+ } else if (
+ static_cast<string::size_type>(size.get()) > buffer.size()) {
+ // We've been asked to receive a particular amount of data and we
+ // haven't yet received that much data so keep receiving.
+ return Continue();
+ }
+
+ // We've received as much data as requested, so return that data!
+ return Break(std::move(buffer));
+ });
}
-static Future<Nothing> _send(
- const std::shared_ptr<SocketImpl>& impl,
- Owned<string> data,
- size_t index,
- size_t length)
+Future<Nothing> SocketImpl::send(const string& data)
{
- // Increment the index into the data.
- index += length;
-
- // Check if we've sent all of the data.
- if (index == data->size()) {
- return Nothing();
- }
-
- // Keep sending!
- return impl->send(data->data() + index, data->size() - index)
- .then(lambda::bind(&_send, impl, data, index, lambda::_1));
-}
-
-
-Future<Nothing> SocketImpl::send(const string& _data)
-{
- Owned<string> data(new string(_data));
-
- return send(data->data(), data->size())
- .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
+ // Extend lifetime by holding onto a reference to ourself!
+ auto self = shared_from_this();
+
+ // We need to share the `index` between both lambdas below.
+ std::shared_ptr<size_t> index(new size_t(0));
+
+ // We store `data.size()` so that we won't make a copy of `data` in
+ // each lambda below since some `data` might be very big!
+ const size_t size = data.size();
+
+ return loop(
+ None(),
+ [=]() {
+ return self->send(data.data() + *index, size - *index);
+ },
+ [=](size_t length) -> ControlFlow<Nothing> {
+ if ((*index += length) != size) {
+ return Continue();
+ }
+ return Break();
+ });
}
} // namespace internal {