You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/05/01 00:44:19 UTC

mesos git commit: Reduced likelihood of a stack overflow in libprocess socket send path.

Repository: mesos
Updated Branches:
  refs/heads/master 7b768a912 -> 8a639ca63


Reduced likelihood of a stack overflow in libprocess socket send path.

Currently, the socket send path is implemented using an asynchronous
loop with callbacks. Without using `process::loop`, this pattern is
prone to a stack overflow in the case that all asynchronous calls
complete synchronously. This is possible with sockets if the socket
is always ready for writing. Users have reported the crash in both
MESOS-8594 and MESOS-8834, so the stack overflow is encountered in
practice.

This patch updates the send path to leverage `process::loop`, which
is supposed to prevent stack overflows in asynchronous loops. However,
it is still possible for `process::loop` to stack overflow due to
MESOS-8852. In practice, I expect that even without MESOS-8852 fixed,
users won't see any stack overflows in the send path.

Review: https://reviews.apache.org/r/66863


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8a639ca6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8a639ca6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8a639ca6

Branch: refs/heads/master
Commit: 8a639ca63bd8071245e270aecdda574aec6f8d3e
Parents: 7b768a9
Author: Benjamin Mahler <bm...@apache.org>
Authored: Sat Apr 28 18:28:39 2018 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Mon Apr 30 17:11:02 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 115 ++++++++++++++++---------------
 1 file changed, 59 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8a639ca6/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 21931db..ef85966 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -73,6 +73,7 @@
 #include <process/id.hpp>
 #include <process/io.hpp>
 #include <process/logging.hpp>
+#include <process/loop.hpp>
 #include <process/mime.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -1753,71 +1754,73 @@ void SocketManager::unproxy(const Socket& socket)
 
 namespace internal {
 
-void _send(
-    const Future<size_t>& result,
-    Socket socket,
-    Encoder* encoder,
-    size_t size);
-
+Future<Nothing> _send(Encoder* encoder, Socket socket);
 
 void send(Encoder* encoder, Socket socket)
 {
-  switch (encoder->kind()) {
-    case Encoder::DATA: {
-      size_t size;
-      const char* data = static_cast<DataEncoder*>(encoder)->next(&size);
-      socket.send(data, size)
-        .onAny(lambda::bind(
-            &internal::_send,
-            lambda::_1,
-            socket,
-            encoder,
-            size));
-      break;
-    }
-    case Encoder::FILE: {
-      off_t offset;
-      size_t size;
-      int_fd fd = static_cast<FileEncoder*>(encoder)->next(&offset, &size);
-      socket.sendfile(fd, offset, size)
-        .onAny(lambda::bind(
-            &internal::_send,
-            lambda::_1,
-            socket,
-            encoder,
-            size));
-      break;
-    }
-  }
+  _send(encoder, socket)
+    .then([socket] {
+      // Continue sending until this socket has no more
+      // queued outgoing messages.
+      return process::loop(
+          None(),
+          [=] { return socket_manager->next(socket); },
+          [=](Encoder* encoder) -> Future<ControlFlow<Nothing>> {
+            if (encoder == nullptr) {
+              return Break();
+            }
+
+            return _send(encoder, socket)
+              .then([]() -> ControlFlow<Nothing> { return Continue(); });
+        });
+    });
 }
 
 
-void _send(
-    const Future<size_t>& length,
-    Socket socket,
-    Encoder* encoder,
-    size_t size)
+Future<Nothing> _send(Encoder* encoder, Socket socket)
 {
-  if (length.isDiscarded() || length.isFailed()) {
-    socket_manager->close(socket);
-    delete encoder;
-  } else {
-    // Update the encoder with the amount sent.
-    encoder->backup(size - length.get());
+  // Loop until all of the data in the provided encoder is sent.
+  return process::loop(
+      None(),
+      [=] {
+        size_t size;
+        Future<size_t> send;
 
-    // See if there is any more of the message to send.
-    if (encoder->remaining() == 0) {
-      delete encoder;
+        switch (encoder->kind()) {
+          case Encoder::DATA: {
+            const char* data =
+              static_cast<DataEncoder*>(encoder)->next(&size);
+            send = socket.send(data, size);
+            break;
+          }
+          case Encoder::FILE: {
+            off_t offset;
+            int_fd fd =
+              static_cast<FileEncoder*>(encoder)->next(&offset, &size);
+            send = socket.sendfile(fd, offset, size);
+            break;
+          }
+        }
 
-      // Check for more stuff to send on socket.
-      Encoder* next = socket_manager->next(socket);
-      if (next != nullptr) {
-        send(next, socket);
-      }
-    } else {
-      send(encoder, socket);
-    }
-  }
+        return send
+          .then([=](size_t sent) {
+            // Update the encoder with the amount sent.
+            encoder->backup(size - sent);
+            return Nothing();
+          })
+          .recover([=](const Future<Nothing>& f) {
+            socket_manager->close(socket);
+            delete encoder;
+            return f; // Break the loop by propagating the "failure".
+          });
+      },
+      [=](Nothing) -> ControlFlow<Nothing> {
+        if (encoder->remaining() == 0) {
+          delete encoder;
+          return Break();
+        }
+        return Continue();
+      });
 }
 
 } // namespace internal {