You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 18:00:23 UTC

[mesos] 04/06: Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 2ddd6f07bebbe91e1e0d5165c4a5ae552b836303
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:14 2018 +0200

    Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.
    
    Previously, IOSwitchboard process could terminate before all HTTP
    responses had been sent to the agent. In the case of
    `ATTACH_CONTAINER_INPUT` call, we could drop a final HTTP `200 OK`
    response, so the agent got broken HTTP connection for the call.
    This patch introduces an acknowledgment for the received response
    for the `ATTACH_CONTAINER_INPUT` call. This acknowledgment is a new
    type of control messages for the `ATTACH_CONTAINER_INPUT` call. When
    IOSwitchboard receives an acknowledgment, and io redirects are
    finished, it terminates itself. That guarantees that the agent always
    receives a response for the `ATTACH_CONTAINER_INPUT` call.
    
    Review: https://reviews.apache.org/r/65168/
    (cherry picked from commit 5b95bb0f21852058d22703385f2c8e139881bf1a)
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 52 ++++++++++++++++++------
 src/slave/http.cpp                               | 28 ++++++++++++-
 src/slave/http.hpp                               |  3 ++
 src/tests/containerizer/io_switchboard_tests.cpp | 17 ++++++++
 4 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 747ffd3..10680ae 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1009,6 +1009,9 @@ private:
   // switchboard.
   Option<Error> validate(const agent::Call::AttachContainerInput& call);
 
+  // Handle acknowledgment for `ATTACH_CONTAINER_INPUT` call.
+  Future<http::Response> acknowledgeContainerInputResponse();
+
   // Handle `ATTACH_CONTAINER_INPUT` calls.
   Future<http::Response> attachContainerInput(
       const Owned<recordio::Reader<agent::Call>>& reader);
@@ -1034,6 +1037,10 @@ private:
   bool waitForConnection;
   Option<Duration> heartbeatInterval;
   bool inputConnected;
+  // Each time the agent receives a response for `ATTACH_CONTAINER_INPUT`
+  // request it sends an acknowledgment. This counter is used to delay
+  // IOSwitchboard termination until all acknowledgments are received.
+  size_t numPendingAcknowledgments;
   Future<unix::Socket> accept;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
@@ -1169,7 +1176,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     socket(_socket),
     waitForConnection(_waitForConnection),
     heartbeatInterval(_heartbeatInterval),
-    inputConnected(false) {}
+    inputConnected(false),
+    numPendingAcknowledgments(0) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
@@ -1226,12 +1234,12 @@ Future<Nothing> IOSwitchboardServerProcess::run()
       // containers with this behavior and we will exit out of the
       // switchboard process early.
       //
-      // If our IO redirects are finished and there is an input connected,
-      // then we set `redirectFinished` promise which triggers a callback for
+      // If our IO redirects are finished and there are pending
+      // acknowledgments for `ATTACH_CONTAINER_INPUT` requests, then
+      // we set `redirectFinished` promise which triggers a callback for
       // `attachContainerInput()`. This callback returns a final `HTTP 200`
       // response to the client, even if the client has not yet sent the EOF
-      // message. So we postpone our termination until we send a final
-      // response to the client.
+      // message.
       //
       // NOTE: We always call `terminate()` with `false` to ensure
       // that our event queue is drained before actually terminating.
@@ -1262,7 +1270,7 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 
       collect(stdoutRedirect, stderrRedirect)
         .then(defer(self(), [this]() {
-          if (inputConnected) {
+          if (numPendingAcknowledgments > 0) {
             redirectFinished.set(http::OK());
           } else {
             terminate(self(), false);
@@ -1372,6 +1380,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
 {
   CHECK_EQ("POST", request.method);
 
+  if (request.url.path == "/acknowledge_container_input_response") {
+    return acknowledgeContainerInputResponse();
+  }
+
   Option<string> contentType_ = request.headers.get("Content-Type");
   CHECK_SOME(contentType_);
 
@@ -1598,9 +1610,30 @@ Option<Error> IOSwitchboardServerProcess::validate(
 }
 
 
+Future<http::Response>
+IOSwitchboardServerProcess::acknowledgeContainerInputResponse()
+{
+  // Check if this is an acknowledgment sent by the agent. This acknowledgment
+  // means that response for `ATTACH_CONTAINER_INPUT` call has been received by
+  // the agent.
+  CHECK_GT(numPendingAcknowledgments, 0u);
+  if (--numPendingAcknowledgments == 0) {
+    // If IO redirects are finished or writing to `stdin` failed we want to
+    // terminate ourselves (after flushing any outstanding messages from our
+    // message queue).
+    if (!redirectFinished.future().isPending() || failure.isSome()) {
+      terminate(self(), false);
+    }
+  }
+  return http::OK();
+}
+
+
 Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
     const Owned<recordio::Reader<agent::Call>>& reader)
 {
+  ++numPendingAcknowledgments;
+
   // Only allow a single input connection at a time.
   if (inputConnected) {
     return http::Conflict("Multiple input connections are not allowed");
@@ -1733,13 +1766,6 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
         // Reset `inputConnected` to allow future input connections.
         inputConnected = false;
 
-        // If IO redirects are finished or writing to `stdin` failed we want
-        // to terminate ourselves (after flushing any outstanding messages
-        // from our message queue).
-        if (!redirectFinished.future().isPending() || failure.isSome()) {
-          terminate(self(), false);
-        }
-
         return response;
       }));
 }
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 9e4525b..0677eb4 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -3054,8 +3054,7 @@ Future<Response> Http::_attachContainerInput(
       std::move(decoder), encoder, writer);
 
   return slave->containerizer->attach(containerId)
-    .then([mediaTypes, reader, writer, transform](
-        Connection connection) mutable {
+    .then([=](Connection connection) mutable {
       Request request;
       request.method = "POST";
       request.type = Request::PIPE;
@@ -3090,6 +3089,31 @@ Future<Response> Http::_attachContainerInput(
       connection.disconnected()
         .onAny([connection]() {});
 
+      return connection.send(request)
+        .onAny(defer(
+            slave->self(),
+            [=](const Future<Response>&) {
+              // After we have received a response for `ATTACH_CONTAINER_INPUT`
+              // call, we need to send an acknowledgment to the IOSwitchboard,
+              // so that the IOSwitchboard process can terminate itself. This is
+              // a workaround for the problem with dropping outstanding HTTP
+              // responses due to a lack of graceful shutdown in libprocess.
+              acknowledgeContainerInputResponse(containerId);
+            }));
+    });
+}
+
+
+Future<Response> Http::acknowledgeContainerInputResponse(
+    const ContainerID& containerId) const {
+  return slave->containerizer->attach(containerId)
+    .then([](Connection connection) {
+      Request request;
+      request.method = "POST";
+      request.type = Request::BODY;
+      request.url.domain = "";
+      request.url.path = "/acknowledge_container_input_response";
+
       return connection.send(request);
     });
 }
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index dcfd0d9..6b74a23 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -327,6 +327,9 @@ private:
       process::Owned<recordio::Reader<agent::Call>>&& decoder,
       const RequestMediaTypes& mediaTypes) const;
 
+  process::Future<process::http::Response> acknowledgeContainerInputResponse(
+      const ContainerID& containerId) const;
+
   process::Future<process::http::Response> attachContainerOutput(
       const mesos::agent::Call& call,
       const RequestMediaTypes& mediaTypes,
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 784386a..ece9cf1 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -121,6 +121,19 @@ protected:
     return connection.send(request, true);
   }
 
+  // Helper that sends an acknowledgment for the `ATTACH_CONTAINER_INPUT`
+  // request.
+  Future<http::Response> acknowledgeContainerInputResponse(
+      http::Connection connection) const {
+    http::Request request;
+    request.method = "POST";
+    request.type = http::Request::BODY;
+    request.url.domain = "";
+    request.url.path = "/acknowledge_container_input_response";
+
+    return connection.send(request);
+  }
+
   // Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
   // and returns the merged stdout and stderr.
   // NOTE: It ignores any `ProcessIO::Control` records.
@@ -578,6 +591,8 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
 
+  acknowledgeContainerInputResponse(connection);
+
   AWAIT_READY(connection.disconnect());
   AWAIT_READY(connection.disconnected());
 
@@ -689,6 +704,8 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
   // result of receiving the heartbeats.
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
 
+  acknowledgeContainerInputResponse(connection);
+
   AWAIT_READY(connection.disconnect());
   AWAIT_READY(connection.disconnected());