You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 18:00:21 UTC

[mesos] 02/06: Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a1798ae1fb2249280f4a4e9fec69eb9e37b95452
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:01 2018 +0200

    Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.
    
    This test verifies that IOSwitchboard, which holds an open HTTP input
    connection, terminates once IO redirects finish for the corresponding
    nested container.
    
    Review: https://reviews.apache.org/r/68230/
    (cherry picked from commit e941d206f651bde861675a6517a89e44d1f61a34)
---
 src/tests/api_tests.cpp | 248 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 248 insertions(+)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index c2e57e9..0b01ed3 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -6548,6 +6548,254 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
 }
 
 
+// This test verifies that IOSwitchboard, which holds an open HTTP input
+// connection, terminates once IO redirects finish for the corresponding
+// nested container:
+//   1. Launches a parent container `sleep 1000` via the default executor.
+//   2. Launches "sh" as a nested container session.
+//   3. Attaches to nested container's input via `ATTACH_CONTAINER_INPUT`
+//      call to send "kill `pgrep sleep`" command into "sh" which kills
+//      the parent container.
+//   4. Check that all containers have been terminated.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    AgentAPITest,
+    ROOT_CGROUPS_LaunchNestedContainerSessionKillTask)
+{
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux,namespaces/pid";
+
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> updateStarting;
+  Future<v1::scheduler::Event::Update> updateRunning;
+  Future<v1::scheduler::Event::Update> updateFailed;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateStarting),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateRunning),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateFailed),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::Resources resources =
+    v1::Resources::parse(defaultTaskResourcesString).get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+  AWAIT_READY(updateStarting);
+  ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
+  AWAIT_READY(updateRunning);
+  ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
+  ASSERT_TRUE(updateRunning->status().has_container_status());
+
+  v1::ContainerStatus status = updateRunning->status().container_status();
+
+  ASSERT_TRUE(status.has_container_id());
+  EXPECT_TRUE(status.container_id().has_parent());
+
+  // Launch "sh" command via `LAUNCH_NESTED_CONTAINER_SESSION` call.
+  v1::ContainerID containerId;
+  containerId.mutable_parent()->CopyFrom(status.container_id());
+  containerId.set_value(id::UUID::random().toString());
+
+  Future<http::Response> sessionResponse;
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+    call.mutable_launch_nested_container_session()->mutable_container_id()
+      ->CopyFrom(containerId);
+
+    call.mutable_launch_nested_container_session()->mutable_command()
+      ->CopyFrom(v1::createCommandInfo("sh"));
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(ContentType::RECORDIO);
+    headers[MESSAGE_ACCEPT] = stringify(contentType);
+
+    sessionResponse = http::streaming::post(
+        slave.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
+  }
+
+  // Send "pkill sleep" to "sh" via `ATTACH_CONTAINER_INPUT` call.
+  // Note, that we do not close input connection because we want to emulate
+  // user's interactive debug session.
+  http::Pipe pipe;
+  http::Pipe::Writer writer = pipe.writer();
+  http::Pipe::Reader reader = pipe.reader();
+
+  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+      serialize, contentType, lambda::_1));
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+    attach->mutable_container_id()->CopyFrom(containerId);
+
+    writer.write(encoder.encode(call));
+  }
+
+  const std::string command = "pkill sleep\n";
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+    v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+    processIO->set_type(v1::agent::ProcessIO::DATA);
+    processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+    processIO->mutable_data()->set_data(command);
+
+    writer.write(encoder.encode(call));
+  }
+
+  {
+    // TODO(anand): Add a `post()` overload that handles request streaming.
+    http::URL agent = http::URL(
+        "http",
+        slave.get()->pid.address.ip,
+        slave.get()->pid.address.port,
+        slave.get()->pid.id +
+        "/api/v1");
+
+    Future<http::Connection> _connection = http::connect(agent);
+    AWAIT_READY(_connection);
+
+    http::Connection connection = _connection.get(); // Remove const.
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Content-Type"] = stringify(ContentType::RECORDIO);
+    headers[MESSAGE_CONTENT_TYPE] = stringify(contentType);
+
+    http::Request request;
+    request.url = agent;
+    request.method = "POST";
+    request.type = http::Request::PIPE;
+    request.reader = reader;
+    request.headers = headers;
+
+    Future<http::Response> response = connection.send(request);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
+  ASSERT_SOME(sessionResponse->reader);
+
+  Option<http::Pipe::Reader> output = sessionResponse->reader.get();
+  ASSERT_SOME(output);
+
+  Future<tuple<string, string>> received =
+    getProcessIOData(contentType, output.get());
+
+  AWAIT_READY(received);
+
+  string stdoutReceived;
+  string stderrReceived;
+
+  tie(stdoutReceived, stderrReceived) = received.get();
+
+  ASSERT_TRUE(stdoutReceived.empty());
+  ASSERT_TRUE(stderrReceived.empty());
+
+  // Check terminal status update of the task.
+  AWAIT_READY(updateFailed);
+
+  ASSERT_EQ(v1::TASK_FAILED, updateFailed->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateFailed->status().task_id());
+}
+
+
 // This test verifies that attaching to the output of a container fails if the
 // containerizer doesn't support the operation.
 TEST_P(AgentAPITest, AttachContainerOutputFailure)