You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/16 18:38:41 UTC
[1/2] mesos git commit: Refactored
IOSwitchboardServerTest.AttachOutput test.
Repository: mesos
Updated Branches:
refs/heads/master 28eaa8df7 -> 929c3c7b6
Refactored IOSwitchboardServerTest.AttachOutput test.
Pulled out some code into helpers that can be re-used in other tests.
I plan to use them in subsequent patches.
Review: https://reviews.apache.org/r/54727
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/929c3c7b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/929c3c7b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/929c3c7b
Branch: refs/heads/master
Commit: 929c3c7b6cd4681ac1d1d1f4d2c27c86318c1f7d
Parents: 8b09da9
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Dec 13 16:43:31 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 16 13:38:19 2016 -0500
----------------------------------------------------------------------
.../containerizer/io_switchboard_tests.cpp | 144 +++++++++++--------
1 file changed, 87 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/929c3c7b/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 2718071..930f98f 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -16,6 +16,7 @@
#include <map>
#include <string>
+#include <tuple>
#include <vector>
#include <process/clock.hpp>
@@ -78,6 +79,7 @@ using testing::Eq;
using std::map;
using std::string;
+using std::tuple;
using std::vector;
namespace mesos {
@@ -85,7 +87,77 @@ namespace internal {
namespace tests {
#ifndef __WINDOWS__
-class IOSwitchboardServerTest : public TemporaryDirectoryTest {};
+class IOSwitchboardServerTest : public TemporaryDirectoryTest
+{
+protected:
+ // Helper that sends `ATTACH_CONTAINER_OUTPUT` request on the given
+ // `connection` and returns the response.
+ //
+ // TODO(vinod): Make this function more generic (e.g., sends any `Call`).
+ Future<http::Response> attachOutput(
+ const ContainerID& containerId,
+ http::Connection connection)
+ {
+ Call call;
+ call.set_type(Call::ATTACH_CONTAINER_OUTPUT);
+
+ Call::AttachContainerOutput* attach =
+ call.mutable_attach_container_output();
+
+ attach->mutable_container_id()->CopyFrom(containerId);
+
+ http::Request request;
+ request.method = "POST";
+ request.url.domain = "";
+ request.url.path = "/";
+ request.keepAlive = true;
+ request.headers["Accept"] = APPLICATION_STREAMING_JSON;
+ request.headers["Content-Type"] = APPLICATION_JSON;
+ request.body = stringify(JSON::protobuf(call));
+
+ return connection.send(request, true);
+ }
+
+ // Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
+ // and returns the merged stdout and stderr.
+ // NOTE: It ignores any `ProcessIO::Control` records.
+ //
+ // TODO(vinod): Merge this with the identically named helper in api_tests.cpp.
+ Future<tuple<string, string>> getProcessIOData(http::Pipe::Reader reader)
+ {
+ return reader.readAll()
+ .then([](const string& data) -> Future<tuple<string, string>> {
+ string stdoutReceived;
+ string stderrReceived;
+
+ ::recordio::Decoder<agent::ProcessIO> decoder(lambda::bind(
+ deserialize<agent::ProcessIO>, ContentType::JSON, lambda::_1));
+
+ Try<std::deque<Try<agent::ProcessIO>>> records = decoder.decode(data);
+
+ if (records.isError()) {
+ return process::Failure(records.error());
+ }
+
+ while(!records->empty()) {
+ Try<agent::ProcessIO> record = records->front();
+ records->pop_front();
+
+ if (record.isError()) {
+ return process::Failure(record.error());
+ }
+
+ if (record->data().type() == agent::ProcessIO::Data::STDOUT) {
+ stdoutReceived += record->data().data();
+ } else if (record->data().type() == agent::ProcessIO::Data::STDERR) {
+ stderrReceived += record->data().data();
+ }
+ }
+
+ return std::make_tuple(stdoutReceived, stderrReceived);
+ });
+ }
+};
TEST_F(IOSwitchboardServerTest, RedirectLog)
@@ -244,76 +316,37 @@ TEST_F(IOSwitchboardServerTest, AttachOutput)
Future<Nothing> runServer = server.get()->run();
- Call call;
- call.set_type(Call::ATTACH_CONTAINER_OUTPUT);
-
- Call::AttachContainerOutput* attach = call.mutable_attach_container_output();
- attach->mutable_container_id()->set_value(UUID::random().toString());
-
- http::Request request;
- request.method = "POST";
- request.url.domain = "";
- request.url.path = "/";
- request.keepAlive = true;
- request.headers["Accept"] = APPLICATION_STREAMING_JSON;
- request.headers["Content-Type"] = APPLICATION_JSON;
- request.body = stringify(JSON::protobuf(call));
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
Try<unix::Address> address = unix::Address::create(socketPath);
ASSERT_SOME(address);
- Future<http::Connection> _connection = http::connect(
- address.get(), http::Scheme::HTTP);
+ Future<http::Connection> _connection =
+ http::connect(address.get(), http::Scheme::HTTP);
AWAIT_READY(_connection);
http::Connection connection = _connection.get();
- Future<http::Response> response = connection.send(request, true);
+ Future<http::Response> response = attachOutput(containerId, connection);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response.get().type);
+ ASSERT_SOME(response.get().reader);
- Option<http::Pipe::Reader> reader = response.get().reader;
- ASSERT_SOME(reader);
-
- auto deserializer = [](const string& body) {
- Try<JSON::Value> value = JSON::parse(body);
- Try<agent::ProcessIO> parse =
- ::protobuf::parse<agent::ProcessIO>(value.get());
- return parse;
- };
-
- recordio::Reader<agent::ProcessIO> responseDecoder(
- ::recordio::Decoder<agent::ProcessIO>(deserializer),
- reader.get());
-
- string stdoutReceived = "";
- string stderrReceived = "";
-
- while (true) {
- Future<Result<agent::ProcessIO>> _message = responseDecoder.read();
- AWAIT_READY(_message);
-
- if (_message->isNone()) {
- break;
- }
+ Future<tuple<string, string>> received =
+ getProcessIOData(response->reader.get());
- ASSERT_SOME(_message.get());
-
- agent::ProcessIO message = _message.get().get();
+ AWAIT_READY(received);
- ASSERT_EQ(agent::ProcessIO::DATA, message.type());
+ string stdoutReceived;
+ string stderrReceived;
- ASSERT_TRUE(message.data().type() == agent::ProcessIO::Data::STDOUT ||
- message.data().type() == agent::ProcessIO::Data::STDERR);
+ tie(stdoutReceived, stderrReceived) = received.get();
- if (message.data().type() == agent::ProcessIO::Data::STDOUT) {
- stdoutReceived += message.data().data();
- } else if (message.data().type() == agent::ProcessIO::Data::STDERR) {
- stderrReceived += message.data().data();
- }
- }
+ EXPECT_EQ(data, stdoutReceived);
+ EXPECT_EQ(data, stderrReceived);
AWAIT_READY(connection.disconnect());
AWAIT_READY(connection.disconnected());
@@ -323,9 +356,6 @@ TEST_F(IOSwitchboardServerTest, AttachOutput)
os::close(nullFd.get());
os::close(stdoutFd.get());
os::close(stderrFd.get());
-
- EXPECT_EQ(data, stdoutReceived);
- EXPECT_EQ(data, stderrReceived);
}
[2/2] mesos git commit: Added a test that verifies container attach
after agent restart.
Posted by vi...@apache.org.
Added a test that verifies container attach after agent restart.
Review: https://reviews.apache.org/r/54720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b09da9a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b09da9a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b09da9a
Branch: refs/heads/master
Commit: 8b09da9aa6f6ed31c4fd727e546eb10afc2b7a34
Parents: 28eaa8d
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Dec 12 16:42:19 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 16 13:38:19 2016 -0500
----------------------------------------------------------------------
.../containerizer/io_switchboard_tests.cpp | 108 +++++++++++++++++++
1 file changed, 108 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b09da9a/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 074c32b..2718071 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -1033,6 +1033,114 @@ TEST_F(IOSwitchboardTest, RecoverThenKillSwitchboardContainerDestroyed)
driver.stop();
driver.join();
}
+
+
+// This test verifies that a container can be attached after a slave restart.
+TEST_F(IOSwitchboardTest, ContainerAttachAfterSlaveRestart)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.launcher = "posix";
+ flags.isolation = "posix/cpu";
+#ifdef __linux__
+ flags.agent_subsystems = None();
+#endif
+
+ Fetcher fetcher;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<MesosContainerizer*> create = MesosContainerizer::create(
+ flags,
+ false,
+ &fetcher);
+
+ ASSERT_SOME(create);
+
+ Owned<MesosContainerizer> containerizer(create.get());
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(
+ detector.get(),
+ containerizer.get(),
+ flags);
+
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ Future<Nothing> _ack =
+ FUTURE_DISPATCH(_, &slave::Slave::_statusUpdateAcknowledgement);
+
+ // Launch a task with tty to start the switchboard server.
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ task.mutable_container()->set_type(ContainerInfo::MESOS);
+ task.mutable_container()->mutable_tty_info();
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(statusRunning);
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(_ack);
+
+ // Restart the slave with a new containerizer.
+ slave.get()->terminate();
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &slave::Slave::_recover);
+
+ create = MesosContainerizer::create(
+ flags,
+ false,
+ &fetcher);
+
+ ASSERT_SOME(create);
+
+ containerizer.reset(create.get());
+
+ slave = StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait until containerizer is recovered.
+ AWAIT_READY(_recover);
+
+ Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId;
+ containerId.set_value(containers->begin()->value());
+
+ Future<http::Connection> connection = containerizer->attach(containerId);
+ AWAIT_READY(connection);
+
+ driver.stop();
+ driver.join();
+}
+
#endif // __WINDOWS__
} // namespace tests {