You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/11/20 00:54:14 UTC

[03/15] mesos git commit: Used loop in SocketImpl.

Used loop in SocketImpl.

Review: https://reviews.apache.org/r/55317


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ececb45
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ececb45
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ececb45

Branch: refs/heads/master
Commit: 7ececb45ed1bba315f1afb65dce463b542e203d1
Parents: 66bd732
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Dec 18 18:19:01 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Nov 19 16:33:47 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/socket.cpp | 146 +++++++++++++-------------------
 1 file changed, 61 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ececb45/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 457c5ff..490bbd7 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -15,6 +15,7 @@
 
 #include <boost/shared_array.hpp>
 
+#include <process/loop.hpp>
 #include <process/network.hpp>
 #include <process/owned.hpp>
 #include <process/socket.hpp>
@@ -140,105 +141,80 @@ Try<Address> SocketImpl::bind(const Address& address)
 }
 
 
-static Future<string> _recv(
-    const std::shared_ptr<SocketImpl>& impl,
-    const Option<ssize_t>& size,
-    Owned<string> buffer,
-    size_t chunk,
-    boost::shared_array<char> data,
-    size_t length)
-{
-  if (length == 0) { // EOF.
-    // Return everything we've received thus far, a subsequent receive
-    // will return an empty string.
-    return string(*buffer);
-  }
-
-  buffer->append(data.get(), length);
-
-  if (size.isNone()) {
-    // We've been asked just to return any data that we receive!
-    return string(*buffer);
-  } else if (size.get() < 0) {
-    // We've been asked to receive until EOF so keep receiving since
-    // according to the 'length == 0' check above we haven't reached
-    // EOF yet.
-    return impl->recv(data.get(), chunk)
-      .then(lambda::bind(&_recv,
-                         impl,
-                         size,
-                         buffer,
-                         chunk,
-                         data,
-                         lambda::_1));
-  } else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
-    // We've been asked to receive a particular amount of data and we
-    // haven't yet received that much data so keep receiving.
-    return impl->recv(data.get(), size.get() - buffer->size())
-      .then(lambda::bind(&_recv,
-                         impl,
-                         size,
-                         buffer,
-                         chunk,
-                         data,
-                         lambda::_1));
-  }
-
-  // We've received as much data as requested, so return that data!
-  return string(*buffer);
-}
-
-
 Future<string> SocketImpl::recv(const Option<ssize_t>& size)
 {
+  // Extend lifetime by holding onto a reference to ourself!
+  auto self = shared_from_this();
+
   // Default chunk size to attempt to receive when nothing is
   // specified represents roughly 16 pages.
   static const size_t DEFAULT_CHUNK = 16 * os::pagesize();
 
-  size_t chunk = (size.isNone() || size.get() < 0)
+  const size_t chunk = (size.isNone() || size.get() < 0)
     ? DEFAULT_CHUNK
     : size.get();
 
-  Owned<string> buffer(new string());
   boost::shared_array<char> data(new char[chunk]);
-
-  return recv(data.get(), chunk)
-    .then(lambda::bind(&_recv,
-                       shared_from_this(),
-                       size,
-                       buffer,
-                       chunk,
-                       data,
-                       lambda::_1));
+  string buffer;
+
+  return loop(
+      None(),
+      [=]() {
+        return self->recv(data.get(), chunk);
+      },
+      [=](size_t length) -> ControlFlow<string> {
+        if (length == 0) { // EOF.
+          // Return everything we've received thus far, a subsequent
+          // receive will return an empty string.
+          return Break(std::move(buffer));
+        }
+
+        buffer.append(data.get(), length);
+
+        if (size.isNone()) {
+          // We've been asked just to return any data that we receive!
+          return Break(std::move(buffer));
+        } else if (size.get() < 0) {
+          // We've been asked to receive until EOF so keep receiving
+          // since according to the 'length == 0' check above we
+          // haven't reached EOF yet.
+          return Continue();
+        } else if (
+            static_cast<string::size_type>(size.get()) > buffer.size()) {
+          // We've been asked to receive a particular amount of data and we
+          // haven't yet received that much data so keep receiving.
+          return Continue();
+        }
+
+        // We've received as much data as requested, so return that data!
+        return Break(std::move(buffer));
+      });
 }
 
 
-static Future<Nothing> _send(
-    const std::shared_ptr<SocketImpl>& impl,
-    Owned<string> data,
-    size_t index,
-    size_t length)
+Future<Nothing> SocketImpl::send(const string& data)
 {
-  // Increment the index into the data.
-  index += length;
-
-  // Check if we've sent all of the data.
-  if (index == data->size()) {
-    return Nothing();
-  }
-
-  // Keep sending!
-  return impl->send(data->data() + index, data->size() - index)
-    .then(lambda::bind(&_send, impl, data, index, lambda::_1));
-}
-
-
-Future<Nothing> SocketImpl::send(const string& _data)
-{
-  Owned<string> data(new string(_data));
-
-  return send(data->data(), data->size())
-    .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
+  // Extend lifetime by holding onto a reference to ourself!
+  auto self = shared_from_this();
+
+  // We need to share the `index` between both lambdas below.
+  std::shared_ptr<size_t> index(new size_t(0));
+
+  // We store `data.size()` so that we won't make a copy of `data` in
+  // each lambda below since some `data` might be very big!
+  const size_t size = data.size();
+
+  return loop(
+      None(),
+      [=]() {
+        return self->send(data.data() + *index, size - *index);
+      },
+      [=](size_t length) -> ControlFlow<Nothing> {
+        if ((*index += length) != size) {
+          return Continue();
+        }
+        return Break();
+      });
 }
 
 } // namespace internal {