You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 17:10:44 UTC
[mesos] 04/05: Fixed HTTP errors caused by dropped HTTP responses
by IOSwitchboard.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5b95bb0f21852058d22703385f2c8e139881bf1a
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:14 2018 +0200
Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.
Previously, IOSwitchboard process could terminate before all HTTP
responses had been sent to the agent. In the case of
`ATTACH_CONTAINER_INPUT` call, we could drop a final HTTP `200 OK`
response, so the agent got broken HTTP connection for the call.
This patch introduces an acknowledgment for the received response
for the `ATTACH_CONTAINER_INPUT` call. This acknowledgment is a new
type of control messages for the `ATTACH_CONTAINER_INPUT` call. When
IOSwitchboard receives an acknowledgment, and io redirects are
finished, it terminates itself. That guarantees that the agent always
receives a response for the `ATTACH_CONTAINER_INPUT` call.
Review: https://reviews.apache.org/r/65168/
---
src/slave/containerizer/mesos/io/switchboard.cpp | 52 ++++++++++++++++++------
src/slave/http.cpp | 28 ++++++++++++-
src/slave/http.hpp | 3 ++
src/tests/containerizer/io_switchboard_tests.cpp | 17 ++++++++
4 files changed, 85 insertions(+), 15 deletions(-)
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 0e4edb3..1982d9b 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1004,6 +1004,9 @@ private:
// switchboard.
Option<Error> validate(const agent::Call::AttachContainerInput& call);
+ // Handle acknowledgment for `ATTACH_CONTAINER_INPUT` call.
+ Future<http::Response> acknowledgeContainerInputResponse();
+
// Handle `ATTACH_CONTAINER_INPUT` calls.
Future<http::Response> attachContainerInput(
const Owned<recordio::Reader<agent::Call>>& reader);
@@ -1029,6 +1032,10 @@ private:
bool waitForConnection;
Option<Duration> heartbeatInterval;
bool inputConnected;
+ // Each time the agent receives a response for `ATTACH_CONTAINER_INPUT`
+ // request it sends an acknowledgment. This counter is used to delay
+ // IOSwitchboard termination until all acknowledgments are received.
+ size_t numPendingAcknowledgments;
Future<unix::Socket> accept;
Promise<Nothing> promise;
Promise<Nothing> startRedirect;
@@ -1164,7 +1171,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
socket(_socket),
waitForConnection(_waitForConnection),
heartbeatInterval(_heartbeatInterval),
- inputConnected(false) {}
+ inputConnected(false),
+ numPendingAcknowledgments(0) {}
Future<Nothing> IOSwitchboardServerProcess::run()
@@ -1221,12 +1229,12 @@ Future<Nothing> IOSwitchboardServerProcess::run()
// containers with this behavior and we will exit out of the
// switchboard process early.
//
- // If our IO redirects are finished and there is an input connected,
- // then we set `redirectFinished` promise which triggers a callback for
+ // If our IO redirects are finished and there are pending
+ // acknowledgments for `ATTACH_CONTAINER_INPUT` requests, then
+ // we set `redirectFinished` promise which triggers a callback for
// `attachContainerInput()`. This callback returns a final `HTTP 200`
// response to the client, even if the client has not yet sent the EOF
- // message. So we postpone our termination until we send a final
- // response to the client.
+ // message.
//
// NOTE: We always call `terminate()` with `false` to ensure
// that our event queue is drained before actually terminating.
@@ -1257,7 +1265,7 @@ Future<Nothing> IOSwitchboardServerProcess::run()
collect(stdoutRedirect, stderrRedirect)
.then(defer(self(), [this]() {
- if (inputConnected) {
+ if (numPendingAcknowledgments > 0) {
redirectFinished.set(http::OK());
} else {
terminate(self(), false);
@@ -1367,6 +1375,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
{
CHECK_EQ("POST", request.method);
+ if (request.url.path == "/acknowledge_container_input_response") {
+ return acknowledgeContainerInputResponse();
+ }
+
Option<string> contentType_ = request.headers.get("Content-Type");
CHECK_SOME(contentType_);
@@ -1593,9 +1605,30 @@ Option<Error> IOSwitchboardServerProcess::validate(
}
+Future<http::Response>
+IOSwitchboardServerProcess::acknowledgeContainerInputResponse()
+{
+ // Check if this is an acknowledgment sent by the agent. This acknowledgment
+ // means that response for `ATTACH_CONTAINER_INPUT` call has been received by
+ // the agent.
+ CHECK_GT(numPendingAcknowledgments, 0u);
+ if (--numPendingAcknowledgments == 0) {
+ // If IO redirects are finished or writing to `stdin` failed we want to
+ // terminate ourselves (after flushing any outstanding messages from our
+ // message queue).
+ if (!redirectFinished.future().isPending() || failure.isSome()) {
+ terminate(self(), false);
+ }
+ }
+ return http::OK();
+}
+
+
Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
const Owned<recordio::Reader<agent::Call>>& reader)
{
+ ++numPendingAcknowledgments;
+
// Only allow a single input connection at a time.
if (inputConnected) {
return http::Conflict("Multiple input connections are not allowed");
@@ -1728,13 +1761,6 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
// Reset `inputConnected` to allow future input connections.
inputConnected = false;
- // If IO redirects are finished or writing to `stdin` failed we want
- // to terminate ourselves (after flushing any outstanding messages
- // from our message queue).
- if (!redirectFinished.future().isPending() || failure.isSome()) {
- terminate(self(), false);
- }
-
return response;
}));
}
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index fb92368..0a57741 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -3096,8 +3096,7 @@ Future<Response> Http::_attachContainerInput(
std::move(decoder), encoder, writer);
return slave->containerizer->attach(containerId)
- .then([mediaTypes, reader, writer, transform](
- Connection connection) mutable {
+ .then([=](Connection connection) mutable {
Request request;
request.method = "POST";
request.type = Request::PIPE;
@@ -3132,6 +3131,31 @@ Future<Response> Http::_attachContainerInput(
connection.disconnected()
.onAny([connection]() {});
+ return connection.send(request)
+ .onAny(defer(
+ slave->self(),
+ [=](const Future<Response>&) {
+ // After we have received a response for `ATTACH_CONTAINER_INPUT`
+ // call, we need to send an acknowledgment to the IOSwitchboard,
+ // so that the IOSwitchboard process can terminate itself. This is
+ // a workaround for the problem with dropping outstanding HTTP
+ // responses due to a lack of graceful shutdown in libprocess.
+ acknowledgeContainerInputResponse(containerId);
+ }));
+ });
+}
+
+
+Future<Response> Http::acknowledgeContainerInputResponse(
+ const ContainerID& containerId) const {
+ return slave->containerizer->attach(containerId)
+ .then([](Connection connection) {
+ Request request;
+ request.method = "POST";
+ request.type = Request::BODY;
+ request.url.domain = "";
+ request.url.path = "/acknowledge_container_input_response";
+
return connection.send(request);
});
}
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 7820087..5b113fa 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -326,6 +326,9 @@ private:
process::Owned<recordio::Reader<agent::Call>>&& decoder,
const RequestMediaTypes& mediaTypes) const;
+ process::Future<process::http::Response> acknowledgeContainerInputResponse(
+ const ContainerID& containerId) const;
+
process::Future<process::http::Response> attachContainerOutput(
const mesos::agent::Call& call,
const RequestMediaTypes& mediaTypes,
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index c00f6a9..e443145 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -121,6 +121,19 @@ protected:
return connection.send(request, true);
}
+ // Helper that sends an acknowledgment for the `ATTACH_CONTAINER_INPUT`
+ // request.
+ Future<http::Response> acknowledgeContainerInputResponse(
+ http::Connection connection) const {
+ http::Request request;
+ request.method = "POST";
+ request.type = http::Request::BODY;
+ request.url.domain = "";
+ request.url.path = "/acknowledge_container_input_response";
+
+ return connection.send(request);
+ }
+
// Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
// and returns the merged stdout and stderr.
// NOTE: It ignores any `ProcessIO::Control` records.
@@ -578,6 +591,8 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ acknowledgeContainerInputResponse(connection);
+
AWAIT_READY(connection.disconnect());
AWAIT_READY(connection.disconnected());
@@ -689,6 +704,8 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
// result of receiving the heartbeats.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ acknowledgeContainerInputResponse(connection);
+
AWAIT_READY(connection.disconnect());
AWAIT_READY(connection.disconnected());