You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 18:00:21 UTC
[mesos] 02/06: Added
`AgentAPITest.LaunchNestedContainerSessionKillTask` test.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a1798ae1fb2249280f4a4e9fec69eb9e37b95452
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:01 2018 +0200
Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.
This test verifies that IOSwitchboard, which holds an open HTTP input
connection, terminates once IO redirects finish for the corresponding
nested container.
Review: https://reviews.apache.org/r/68230/
(cherry picked from commit e941d206f651bde861675a6517a89e44d1f61a34)
---
src/tests/api_tests.cpp | 248 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 248 insertions(+)
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index c2e57e9..0b01ed3 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -6548,6 +6548,254 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
}
+// This test verifies that IOSwitchboard, which holds an open HTTP input
+// connection, terminates once IO redirects finish for the corresponding
+// nested container:
+// 1. Launches a parent container `sleep 1000` via the default executor.
+// 2. Launches "sh" as a nested container session.
+// 3. Attaches to nested container's input via `ATTACH_CONTAINER_INPUT`
+// call to send "kill `pgrep sleep`" command into "sh" which kills
+// the parent container.
+// 4. Check that all containers have been terminated.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+ AgentAPITest,
+ ROOT_CGROUPS_LaunchNestedContainerSessionKillTask)
+{
+ const ContentType contentType = GetParam();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "filesystem/linux,namespaces/pid";
+
+ Fetcher fetcher(flags);
+
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), containerizer.get(), flags);
+
+ ASSERT_SOME(slave);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ contentType,
+ scheduler);
+
+ AWAIT_READY(subscribed);
+ const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ Future<v1::scheduler::Event::Update> updateStarting;
+ Future<v1::scheduler::Event::Update> updateRunning;
+ Future<v1::scheduler::Event::Update> updateFailed;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateRunning),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateFailed),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::Resources resources =
+ v1::Resources::parse(defaultTaskResourcesString).get();
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
+
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
+ AWAIT_READY(updateRunning);
+ ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
+ ASSERT_TRUE(updateRunning->status().has_container_status());
+
+ v1::ContainerStatus status = updateRunning->status().container_status();
+
+ ASSERT_TRUE(status.has_container_id());
+ EXPECT_TRUE(status.container_id().has_parent());
+
+ // Launch "sh" command via `LAUNCH_NESTED_CONTAINER_SESSION` call.
+ v1::ContainerID containerId;
+ containerId.mutable_parent()->CopyFrom(status.container_id());
+ containerId.set_value(id::UUID::random().toString());
+
+ Future<http::Response> sessionResponse;
+
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ call.mutable_launch_nested_container_session()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ call.mutable_launch_nested_container_session()->mutable_command()
+ ->CopyFrom(v1::createCommandInfo("sh"));
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(ContentType::RECORDIO);
+ headers[MESSAGE_ACCEPT] = stringify(contentType);
+
+ sessionResponse = http::streaming::post(
+ slave.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
+ }
+
+ // Send "pkill sleep" to "sh" via `ATTACH_CONTAINER_INPUT` call.
+ // Note, that we do not close input connection because we want to emulate
+ // user's interactive debug session.
+ http::Pipe pipe;
+ http::Pipe::Writer writer = pipe.writer();
+ http::Pipe::Reader reader = pipe.reader();
+
+ ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+ serialize, contentType, lambda::_1));
+
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+ v1::agent::Call::AttachContainerInput* attach =
+ call.mutable_attach_container_input();
+
+ attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+ attach->mutable_container_id()->CopyFrom(containerId);
+
+ writer.write(encoder.encode(call));
+ }
+
+ const std::string command = "pkill sleep\n";
+
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+ v1::agent::Call::AttachContainerInput* attach =
+ call.mutable_attach_container_input();
+
+ attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+ v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+ processIO->set_type(v1::agent::ProcessIO::DATA);
+ processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+ processIO->mutable_data()->set_data(command);
+
+ writer.write(encoder.encode(call));
+ }
+
+ {
+ // TODO(anand): Add a `post()` overload that handles request streaming.
+ http::URL agent = http::URL(
+ "http",
+ slave.get()->pid.address.ip,
+ slave.get()->pid.address.port,
+ slave.get()->pid.id +
+ "/api/v1");
+
+ Future<http::Connection> _connection = http::connect(agent);
+ AWAIT_READY(_connection);
+
+ http::Connection connection = _connection.get(); // Remove const.
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Content-Type"] = stringify(ContentType::RECORDIO);
+ headers[MESSAGE_CONTENT_TYPE] = stringify(contentType);
+
+ http::Request request;
+ request.url = agent;
+ request.method = "POST";
+ request.type = http::Request::PIPE;
+ request.reader = reader;
+ request.headers = headers;
+
+ Future<http::Response> response = connection.send(request);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ }
+
+ // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
+ ASSERT_SOME(sessionResponse->reader);
+
+ Option<http::Pipe::Reader> output = sessionResponse->reader.get();
+ ASSERT_SOME(output);
+
+ Future<tuple<string, string>> received =
+ getProcessIOData(contentType, output.get());
+
+ AWAIT_READY(received);
+
+ string stdoutReceived;
+ string stderrReceived;
+
+ tie(stdoutReceived, stderrReceived) = received.get();
+
+ ASSERT_TRUE(stdoutReceived.empty());
+ ASSERT_TRUE(stderrReceived.empty());
+
+ // Check terminal status update of the task.
+ AWAIT_READY(updateFailed);
+
+ ASSERT_EQ(v1::TASK_FAILED, updateFailed->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateFailed->status().task_id());
+}
+
+
// This test verifies that attaching to the output of a container fails if the
// containerizer doesn't support the operation.
TEST_P(AgentAPITest, AttachContainerOutputFailure)