You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/21 12:52:23 UTC

[mesos] 02/02: Fixed broken pipe error in IOSwitchboard.

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c3c77cbef818d497d8bd5e67fa72e55a7190e27a
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Fri Sep 21 14:51:59 2018 +0200

    Fixed broken pipe error in IOSwitchboard.
    
    Previous attempt to fix `HTTP 500` "broken pipe" in review /r/62187/
    was not correct: after IOSwitchboard sends a response to the agent for
    the `ATTACH_CONTAINER_INPUT` call, the socket is closed immediately,
    thus causing the error on the agent. This patch adds a delay after
    IO redirects are finished and before IOSwitchboard forcibly send a
    response.
    
    Review: https://reviews.apache.org/r/68784/
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 42 +++++++++++++-----------
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 5bb21e7..498c008 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1615,16 +1615,9 @@ IOSwitchboardServerProcess::acknowledgeContainerInputResponse()
   if (--numPendingAcknowledgments == 0) {
     // If IO redirects are finished or writing to `stdin` failed we want to
     // terminate ourselves (after flushing any outstanding messages from our
-    // message queue). Since IOSwitchboard might receive an acknowledgment for
-    // the `ATTACH_CONTAINER_INPUT` request before reading a final message from
-    // the corresponding connection, we need to delay our termination to give
-    // IOSwitchboard a chance to read the final message. Otherwise, the agent
-    // might get `HTTP 500` "broken pipe" while attempting to write the final
-    // message.
+    // message queue).
     if (!redirectFinished.future().isPending() || failure.isSome()) {
-      after(Seconds(1)).onAny(defer(self(), [=](const Future<Nothing>&) {
-        terminate(self(), false);
-      }));
+      terminate(self(), false);
     }
   }
   return http::OK();
@@ -1746,20 +1739,29 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
   // the read loop finishes or IO redirects finish. Once this promise is set,
   // we return a final response to the client.
   //
-  // TODO(abudnik): Ideally, we would have used `process::select()` to capture a
-  // transition into a terminal state for any of `{readLoop, redirectFinished}`.
-  // However, `select()` currently does not capture a future that has failed.
-  // Another alternative would be to allow `promise::associate()` to accept
-  // multiple source futures.
+  // We use `defer(self(), ...)` to use this process as a synchronization point
+  // when changing state of the promise.
   Owned<Promise<http::Response>> promise(new Promise<http::Response>());
 
-  auto setPromise = [promise](const Future<http::Response>& response) {
-    promise->set(response);
-  };
-
-  readLoop.onAny(defer(self(), setPromise));
+  readLoop.onAny(
+      defer(self(), [promise](const Future<http::Response>& response) {
+        promise->set(response);
+      }));
 
-  redirectFinished.future().onAny(defer(self(), setPromise));
+  // Since IOSwitchboard might receive an acknowledgment for the
+  // `ATTACH_CONTAINER_INPUT` request before reading a final message from
+  // the corresponding connection, we need to give IOSwitchboard a chance to
+  // read the final message. Otherwise, the agent might get `HTTP 500`
+  // "broken pipe" while attempting to write the final message.
+  redirectFinished.future().onAny(
+      defer(self(), [=](const Future<http::Response>& response) {
+        // TODO(abudnik): Ideally, we would have used `process::delay()` to
+        // delay a dispatch of the lambda to this process.
+        after(Seconds(1))
+          .onAny(defer(self(), [promise, response](const Future<Nothing>&) {
+            promise->set(response);
+          }));
+      }));
 
   // We explicitly specify the return type to avoid a type deduction
   // issue in some versions of clang. See MESOS-2943.