You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 18:00:20 UTC

[mesos] 01/06: Fixed IOSwitchboard waiting EOF from attach container input request.

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit e3a9eb3b473a10f210913d568c1d9923ed05d933
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:09:31 2018 +0200

    Fixed IOSwitchboard waiting EOF from attach container input request.
    
    Previously, when a corresponding nested container terminated, while the
    user was attached to the container's stdin via `ATTACH_CONTAINER_INPUT`
    IOSwitchboard didn't terminate immediately. IOSwitchboard was waiting
    for EOF message from the input HTTP connection. Since the IOSwitchboard
    was stuck, the corresponding nested container was also stuck in
    `DESTROYING` state.
    
    This patch fixes the aforementioned issue by sending 200 `OK` response
    for `ATTACH_CONTAINER_INPUT` call in the case when io redirect is
    finished while reading from the HTTP input connection is not.
    
    Review: https://reviews.apache.org/r/68232/
    (cherry picked from commit 2fdc8f3cffc5eac91e5f2b0c6aef2254acfc2bd0)
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 74 +++++++++++++++---------
 1 file changed, 48 insertions(+), 26 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 5b6e903..747ffd3 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1034,10 +1034,11 @@ private:
   bool waitForConnection;
   Option<Duration> heartbeatInterval;
   bool inputConnected;
-  bool redirectFinished;  // Set when both stdout and stderr redirects finish.
   Future<unix::Socket> accept;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
+  // Set when both stdout and stderr redirects finish.
+  Promise<http::Response> redirectFinished;
   // The following must be a `std::list`
   // for proper erase semantics later on.
   list<HttpConnection> outputConnections;
@@ -1168,8 +1169,7 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     socket(_socket),
     waitForConnection(_waitForConnection),
     heartbeatInterval(_heartbeatInterval),
-    inputConnected(false),
-    redirectFinished(false) {}
+    inputConnected(false) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
@@ -1227,10 +1227,11 @@ Future<Nothing> IOSwitchboardServerProcess::run()
       // switchboard process early.
       //
       // If our IO redirects are finished and there is an input connected,
-      // then we postpone our termination until either a container closes
-      // its `stdin` or a client closes the input connection so that we can
-      // guarantee returning a http response for `ATTACH_CONTAINER_INPUT`
-      // request before terminating ourselves.
+      // then we set `redirectFinished` promise which triggers a callback for
+      // `attachContainerInput()`. This callback returns a final `HTTP 200`
+      // response to the client, even if the client has not yet sent the EOF
+      // message. So we postpone our termination until we send a final
+      // response to the client.
       //
       // NOTE: We always call `terminate()` with `false` to ensure
       // that our event queue is drained before actually terminating.
@@ -1261,9 +1262,9 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 
       collect(stdoutRedirect, stderrRedirect)
         .then(defer(self(), [this]() {
-          redirectFinished = true;
-
-          if (!inputConnected) {
+          if (inputConnected) {
+            redirectFinished.set(http::OK());
+          } else {
             terminate(self(), false);
           }
           return Nothing();
@@ -1613,7 +1614,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
 
   // Loop through each record and process it. Return a proper
   // response once the last record has been fully processed.
-  return loop(
+  auto readLoop = loop(
       self(),
       [=]() {
         return reader->read();
@@ -1704,22 +1705,43 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
             UNREACHABLE();
           }
         }
-      })
-    // We explicitly specify the return type to avoid a type deduction
-    // issue in some versions of clang. See MESOS-2943.
-    .then(defer(self(), [=](const http::Response& response) -> http::Response {
-      // Reset `inputConnected` to allow future input connections.
-      inputConnected = false;
-
-      // If IO redirects are finished or writing to `stdin` failed we want
-      // to terminate ourselves (after flushing any outstanding messages
-      // from our message queue).
-      if (redirectFinished || failure.isSome()) {
-        terminate(self(), false);
-      }
+      });
 
-      return response;
-    }));
+  // We create a new promise, which is transitioned to `READY` when either
+  // the read loop finishes or IO redirects finish. Once this promise is set,
+  // we return a final response to the client.
+  //
+  // TODO(abudnik): Ideally, we would have used `process::select()` to capture a
+  // transition into a terminal state for any of `{readLoop, redirectFinished}`.
+  // However, `select()` currently does not capture a future that has failed.
+  // Another alternative would be to allow `promise::associate()` to accept
+  // multiple source futures.
+  Owned<Promise<http::Response>> promise(new Promise<http::Response>());
+
+  auto setPromise = [promise](const Future<http::Response>& response) {
+    promise->set(response);
+  };
+
+  readLoop.onAny(defer(self(), setPromise));
+
+  redirectFinished.future().onAny(defer(self(), setPromise));
+
+  // We explicitly specify the return type to avoid a type deduction
+  // issue in some versions of clang. See MESOS-2943.
+  return promise->future().then(
+      defer(self(), [=](const http::Response& response) -> http::Response {
+        // Reset `inputConnected` to allow future input connections.
+        inputConnected = false;
+
+        // If IO redirects are finished or writing to `stdin` failed we want
+        // to terminate ourselves (after flushing any outstanding messages
+        // from our message queue).
+        if (!redirectFinished.future().isPending() || failure.isSome()) {
+          terminate(self(), false);
+        }
+
+        return response;
+      }));
 }