You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/12/05 18:12:10 UTC
[3/4] mesos git commit: Added support for 'ATTACH_CONTAINER_INPUT'
call to the io switchboard.
Added support for 'ATTACH_CONTAINER_INPUT' call to the io switchboard.
Review: https://reviews.apache.org/r/54297/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be45c94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be45c94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be45c94f
Branch: refs/heads/master
Commit: be45c94fc7009851f297ee0403a4bf09f00b6e25
Parents: 02d816e
Author: Kevin Klues <kl...@gmail.com>
Authored: Mon Dec 5 09:45:15 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Mon Dec 5 09:45:15 2016 -0800
----------------------------------------------------------------------
.../containerizer/mesos/io/switchboard.cpp | 235 +++++++++++++++++--
.../containerizer/io_switchboard_tests.cpp | 136 ++++++++++-
2 files changed, 355 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/be45c94f/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index d5211b9..3b6d326 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -29,8 +29,8 @@
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/io.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
-
#include <process/process.hpp>
#include <process/reap.hpp>
#include <process/subprocess.hpp>
@@ -607,6 +607,13 @@ private:
// with the same format we receive them in.
Future<http::Response> handler(const http::Request& request);
+ // Validate `ATTACH_CONTAINER_INPUT` calls.
+ //
+ // TODO(klueska): Move this to `src/slave/validation.hpp` and make
+ // the agent validate all the calls before forwarding them to the
+ // switchboard.
+ Option<Error> validate(const agent::Call::AttachContainerInput& call);
+
// Handle `ATTACH_CONTAINER_INPUT` calls.
Future<http::Response> attachContainerInput(
const Owned<recordio::Reader<agent::Call>>& reader);
@@ -628,11 +635,12 @@ private:
int stderrToFd;
unix::Socket socket;
bool waitForConnection;
+ bool inputConnected;
Promise<Nothing> promise;
Promise<Nothing> startRedirect;
// The following must be a `std::list`
// for proper erase semantics later on.
- list<HttpConnection> connections;
+ list<HttpConnection> outputConnections;
Option<Failure> failure;
};
@@ -734,15 +742,12 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
stderrFromFd(_stderrFromFd),
stderrToFd(_stderrToFd),
socket(_socket),
- waitForConnection(_waitForConnection) {}
+ waitForConnection(_waitForConnection),
+ inputConnected(false) {}
Future<Nothing> IOSwitchboardServerProcess::run()
{
- // TODO(jieyu): This silence the compiler warning of private field
- // being not used. Remove this once it is used.
- stdinToFd = -1;
-
if (!waitForConnection) {
startRedirect.set(Nothing());
}
@@ -838,7 +843,7 @@ Future<Nothing> IOSwitchboardServerProcess::run()
void IOSwitchboardServerProcess::finalize()
{
- foreach (HttpConnection& connection, connections) {
+ foreach (HttpConnection& connection, outputConnections) {
connection.close();
}
@@ -934,14 +939,21 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
[=](const Result<agent::Call>& call) -> Future<http::Response> {
if (call.isNone()) {
return http::BadRequest(
- "Received EOF while reading request body");
+ "IOSwitchboard received EOF while reading request body");
}
if (call.isError()) {
return Failure(call.error());
}
+ // Should have already been validated by the agent.
+ CHECK(call->has_type());
CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call->type());
+ CHECK(call->has_attach_container_input());
+ CHECK_EQ(mesos::agent::Call::AttachContainerInput::CONTAINER_ID,
+ call->attach_container_input().type());
+ CHECK(call->attach_container_input().has_container_id());
+ CHECK(call->attach_container_input().container_id().has_value());
return attachContainerInput(reader);
}));
@@ -957,6 +969,8 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
return http::BadRequest(call.error());
}
+ // Should have already been validated by the agent.
+ CHECK(call->has_type());
CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call->type());
return attachContainerOutput(acceptType);
@@ -965,10 +979,203 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
}
+Option<Error> IOSwitchboardServerProcess::validate(
+ const agent::Call::AttachContainerInput& call)
+{
+ switch (call.type()) {
+ case agent::Call::AttachContainerInput::UNKNOWN:
+ case agent::Call::AttachContainerInput::CONTAINER_ID: {
+ return Error(
+ "Expecting 'attach_container_input.type' to be 'PROCESS_IO'"
+ " instead of: '" + stringify(call.type()) + "'");
+ }
+ case agent::Call::AttachContainerInput::PROCESS_IO: {
+ if (!call.has_process_io()) {
+ return Error(
+ "Expecting 'attach_container_input.process_io' to be present");
+ }
+
+ const agent::ProcessIO& message = call.process_io();
+
+ if (!message.has_type()) {
+ return Error("Expecting 'process_io.type' to be present");
+ }
+
+ switch (message.type()) {
+ case agent::ProcessIO::UNKNOWN: {
+ return Error("'process_io.type' is unknown");
+ }
+ case agent::ProcessIO::CONTROL: {
+ if (!message.has_control()) {
+ return Error("Expecting 'process_io.control' to be present");
+ }
+
+ if (!message.control().has_type()) {
+ return Error("Expecting 'process_io.control.type' to be present");
+ }
+
+ switch (message.control().type()) {
+ case agent::ProcessIO::Control::UNKNOWN: {
+ return Error("'process_io.control.type' is unknown");
+ }
+ case agent::ProcessIO::Control::TTY_INFO: {
+ if (!message.control().has_tty_info()) {
+ return Error(
+ "Expecting 'process_io.control.tty_info' to be present");
+ }
+
+ const TTYInfo& ttyInfo = message.control().tty_info();
+
+ if (!ttyInfo.has_window_size()) {
+ return Error("Expecting 'tty_info.window_size' to be present");
+ }
+ }
+ }
+
+ return None();
+ }
+ case agent::ProcessIO::DATA: {
+ if (!message.has_data()) {
+ return Error("Expecting 'process_io.data' to be present");
+ }
+
+ if (!message.data().has_type()) {
+ return Error("Expecting 'process_io.data.type' to be present");
+ }
+
+ if (message.data().type() != agent::ProcessIO::Data::STDIN) {
+ return Error("Expecting 'process_io.data.type' to be 'STDIN'");
+ }
+
+ if (!message.data().has_data()) {
+ return Error("Expecting 'process_io.data.data' to be present");
+ }
+
+ return None();
+ }
+ }
+ }
+ }
+
+ UNREACHABLE();
+}
+
+
Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
const Owned<recordio::Reader<agent::Call>>& reader)
{
- return http::NotImplemented("ATTACH_CONTAINER_INPUT");
+ // Only allow a single input connection at a time.
+ if (inputConnected) {
+ return http::Conflict("Multiple input connections are not allowed");
+ }
+
+ // We set `inputConnected` to true here and then reset it to false
+ // at the bottom of this function once our asynchronous loop has
+ // terminated. This way another connection can be established once
+ // the current one is complete.
+ inputConnected = true;
+
+ // Loop through each record and process it. Return a proper
+ // response once the last record has been fully processed.
+ Owned<http::Response> response(new http::Response());
+
+ return loop(
+ self(),
+ [=]() {
+ return reader->read();
+ },
+ [=](const Result<agent::Call>& record) -> Future<bool> {
+ if (record.isNone()) {
+ *response = http::OK();
+ return false;
+ }
+
+ if (record.isError()) {
+ *response = http::BadRequest(record.error());
+ return false;
+ }
+
+ // Should have already been validated by the agent.
+ CHECK(record->has_type());
+ CHECK_EQ(mesos::agent::Call::ATTACH_CONTAINER_INPUT, record->type());
+ CHECK(record->has_attach_container_input());
+
+ // Validate the rest of the `AttachContainerInput` message.
+ Option<Error> error = validate(record->attach_container_input());
+ if (error.isSome()) {
+ *response = http::BadRequest(error->message);
+ return false;
+ }
+
+ const agent::ProcessIO& message =
+ record->attach_container_input().process_io();
+
+ switch (message.type()) {
+ case agent::ProcessIO::CONTROL: {
+ // TODO(klueska): Return a failure if the container we are
+ // attaching to does not have a tty associated with it.
+
+ // Update the window size.
+ Try<Nothing> window = os::setWindowSize(
+ stdinToFd,
+ message.control().tty_info().window_size().rows(),
+ message.control().tty_info().window_size().columns());
+
+ if (window.isError()) {
+ *response = http::BadRequest(
+ "Unable to set the window size: " + window.error());
+ return false;
+ }
+
+ return true;
+ }
+ case agent::ProcessIO::DATA: {
+ // Write the STDIN data to `stdinToFd`. If there is a
+ // failure, we set the `failure` member variable and exit
+ // the loop. In the resulting `.then()` callback, we then
+ // terminate the process. We don't terminate the process
+ // here because we want to propagate an `InternalServerError`
+ // back to the client.
+ Owned<Promise<bool>> condition(new Promise<bool>());
+
+ process::io::write(stdinToFd, message.data().data())
+ .onAny(defer(self(), [this, condition, response](
+ const Future<Nothing>& future) {
+ if (future.isReady()) {
+ condition->set(true);
+ return;
+ }
+
+ failure = Failure(
+ "Failed writing to stdin:"
+ " " + (future.isFailed() ? future.failure() : "discarded"));
+
+ *response = http::InternalServerError(failure->message);
+
+ condition->set(false);
+ }));
+
+ return condition->future();
+ }
+ default: {
+ UNREACHABLE();
+ }
+ }
+ })
+ .then(defer(self(), [this, response](
+ const Future<Nothing> future) -> http::Response {
+ // Reset `inputConnected` to allow future input connections.
+ inputConnected = false;
+
+ // We only set `failure` if writing to `stdin` failed, in which
+ // case we want to terminate ourselves (after flushing any
+ // outstanding messages from our message queue).
+ if (failure.isSome()) {
+ terminate(self(), false);
+ }
+
+ return *response;
+ }));
}
@@ -987,7 +1194,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
// connection. If we ever detect a connection has been closed,
// we remove it from this list.
HttpConnection connection(pipe.writer(), acceptType);
- auto iterator = connections.insert(connections.end(), connection);
+ auto iterator = outputConnections.insert(outputConnections.end(), connection);
// We use the `startRedirect` promise to indicate when we should
// begin reading data from our `stdoutFromFd` and `stderrFromFd`
@@ -1002,7 +1209,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
.then(defer(self(), [this, iterator]() {
// Erasing from a `std::list` only invalidates the iterator of
// the object being erased. All other iterators remain valid.
- connections.erase(iterator);
+ outputConnections.erase(iterator);
return Nothing();
}));
@@ -1015,7 +1222,7 @@ void IOSwitchboardServerProcess::outputHook(
const agent::ProcessIO::Data::Type& type)
{
// Break early if there are no connections to send the data to.
- if (connections.size() == 0) {
+ if (outputConnections.size() == 0) {
return;
}
@@ -1032,7 +1239,7 @@ void IOSwitchboardServerProcess::outputHook(
// the `HttpConnection::closed()` call above. We might do a few
// unnecessary writes if we have a bunch of messages queued up,
// but that shouldn't be a problem.
- foreach (HttpConnection& connection, connections) {
+ foreach (HttpConnection& connection, outputConnections) {
connection.send(message);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/be45c94f/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 131ca94..c8fe876 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -47,6 +47,7 @@ namespace unix = process::network::unix;
#endif // __WINDOWS__
using mesos::agent::Call;
+using mesos::agent::ProcessIO;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::IOSwitchboardServer;
@@ -69,7 +70,7 @@ namespace tests {
class IOSwitchboardServerTest : public TemporaryDirectoryTest {};
-TEST_F(IOSwitchboardServerTest, ServerRedirectLog)
+TEST_F(IOSwitchboardServerTest, RedirectLog)
{
int stdoutPipe[2];
int stderrPipe[2];
@@ -158,7 +159,7 @@ TEST_F(IOSwitchboardServerTest, ServerRedirectLog)
}
-TEST_F(IOSwitchboardServerTest, ServerAttachOutput)
+TEST_F(IOSwitchboardServerTest, AttachOutput)
{
Try<int> nullFd = os::open("/dev/null", O_RDWR);
ASSERT_SOME(nullFd);
@@ -308,6 +309,136 @@ TEST_F(IOSwitchboardServerTest, ServerAttachOutput)
}
+TEST_F(IOSwitchboardServerTest, AttachInput)
+{
+ // We use a pipe in this test to prevent the switchboard from
+ // reading EOF on its `stdoutFromFd` until we are ready for the
+ // switchboard to terminate.
+ int stdoutPipe[2];
+
+ Try<Nothing> pipe = os::pipe(stdoutPipe);
+ ASSERT_SOME(pipe);
+
+ Try<int> nullFd = os::open("/dev/null", O_RDWR);
+ ASSERT_SOME(nullFd);
+
+ string stdinPath = path::join(sandbox.get(), "stdin");
+ Try<int> stdinFd = os::open(
+ stdinPath,
+ O_RDWR | O_CREAT,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+ ASSERT_SOME(stdinFd);
+
+ string socketPath = path::join(
+ sandbox.get(),
+ "mesos-io-switchboard-" + UUID::random().toString());
+
+ Try<Owned<IOSwitchboardServer>> server = IOSwitchboardServer::create(
+ false,
+ stdinFd.get(),
+ stdoutPipe[0],
+ nullFd.get(),
+ nullFd.get(),
+ nullFd.get(),
+ socketPath,
+ false);
+
+ ASSERT_SOME(server);
+
+ Future<Nothing> runServer = server.get()->run();
+
+ string data =
+ "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+ "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+ "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+ "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+ "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+ "culpa qui officia deserunt mollit anim id est laborum.";
+
+ while (Bytes(data.size()) < Megabytes(1)) {
+ data.append(data);
+ }
+
+ http::Pipe requestPipe;
+ http::Pipe::Reader reader = requestPipe.reader();
+ http::Pipe::Writer writer = requestPipe.writer();
+
+ http::Request request;
+ request.method = "POST";
+ request.type = http::Request::PIPE;
+ request.reader = reader;
+ request.url.domain = "";
+ request.url.path = "/";
+ request.keepAlive = true;
+ request.headers["Accept"] = APPLICATION_JSON;
+ request.headers["Content-Type"] = APPLICATION_STREAMING_JSON;
+
+ Try<unix::Address> address = unix::Address::create(socketPath);
+ ASSERT_SOME(address);
+
+ Future<http::Connection> _connection = http::connect(address.get());
+ AWAIT_READY(_connection);
+ http::Connection connection = _connection.get();
+
+ Future<http::Response> response = connection.send(request);
+
+ ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
+ serialize, ContentType::STREAMING_JSON, lambda::_1));
+
+ Call call;
+ call.set_type(Call::ATTACH_CONTAINER_INPUT);
+
+ Call::AttachContainerInput* attach = call.mutable_attach_container_input();
+ attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
+ attach->mutable_container_id()->set_value(UUID::random().toString());
+
+ writer.write(encoder.encode(call));
+
+ size_t offset = 0;
+ size_t chunkSize = 4096;
+ while (offset < data.length()) {
+ string dataChunk = data.substr(offset, chunkSize);
+ offset += chunkSize;
+
+ Call call;
+ call.set_type(Call::ATTACH_CONTAINER_INPUT);
+
+ Call::AttachContainerInput* attach = call.mutable_attach_container_input();
+ attach->set_type(Call::AttachContainerInput::PROCESS_IO);
+
+ ProcessIO* message = attach->mutable_process_io();
+ message->set_type(ProcessIO::DATA);
+ message->mutable_data()->set_type(ProcessIO::Data::STDIN);
+ message->mutable_data()->set_data(dataChunk);
+
+ writer.write(encoder.encode(call));
+ }
+
+ writer.close();
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+ AWAIT_READY(connection.disconnect());
+ AWAIT_READY(connection.disconnected());
+
+ // Closing the write end of `stdoutPipe`
+ // will trigger the switchboard to exit.
+ os::close(stdoutPipe[1]);
+ AWAIT_ASSERT_READY(runServer);
+
+ os::close(stdoutPipe[0]);
+ os::close(nullFd.get());
+ os::close(stdinFd.get());
+
+ Try<string> stdinData = os::read(stdinPath);
+ ASSERT_SOME(stdinData);
+
+ EXPECT_EQ(data, stdinData.get());
+}
+
+
class IOSwitchboardTest
: public ContainerizerTest<slave::MesosContainerizer> {};
@@ -376,6 +507,7 @@ TEST_F(IOSwitchboardTest, OutputRedirectionWithTTY)
EXPECT_SOME_EQ("HelloWorld", os::read(path::join(directory.get(), "stdout")));
}
+
#endif // __WINDOWS__
} // namespace tests {