You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/11 04:49:24 UTC

mesos git commit: Added agent API test for attaching input to a nested container session.

Repository: mesos
Updated Branches:
  refs/heads/master e60d231b5 -> f5c69c4e1


Added agent API test for attaching input to a nested container session.

Review: https://reviews.apache.org/r/54632/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f5c69c4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f5c69c4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f5c69c4e

Branch: refs/heads/master
Commit: f5c69c4e199d4aa939ef7bcb9b86f049273e501b
Parents: e60d231
Author: Kevin Klues <kl...@gmail.com>
Authored: Sat Dec 10 20:49:09 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Dec 10 20:49:09 2016 -0800

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 219 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f5c69c4e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 7a22011..0988048 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -4222,6 +4222,225 @@ TEST_P(AgentAPIStreamingTest, AttachContainerInput)
   ASSERT_EQ(data, stdoutReceived);
 }
 
+
+// This test launches a nested container session with 'cat' as its
+// entrypoint and verifies that any data streamed to the container via
+// an ATTACH_CONTAINER_INPUT call is received by the client on the
+// output stream.
+TEST_P(AgentAPIStreamingTest, AttachInputToNestedContainerSession)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  const Offer& offer = offers.get()[0];
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  TaskInfo taskInfo = createTask(offer, "sleep 1000");
+
+  driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status->state());
+
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+  EXPECT_EQ(1u, containerIds->size());
+
+  v1::ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+  containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+  ContentType contentType = GetParam();
+
+  Future<http::Response> sessionResponse;
+
+  // Start a new LAUNCH_NESTED_CONTAINER_SESSION with `cat` as the
+  // command being launched.
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+    call.mutable_launch_nested_container_session()->mutable_container_id()
+      ->CopyFrom(containerId);
+
+    call.mutable_launch_nested_container_session()->mutable_command()
+      ->CopyFrom(v1::createCommandInfo("cat"));
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    sessionResponse = http::streaming::post(
+        slave.get()->pid,
+        "api/v1",
+        headers,
+        serialize(ContentType::PROTOBUF, call),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
+  }
+
+  // Prepare the data to send to `cat` and send it over an
+  // `ATTACH_CONTAINER_INPUT` stream.
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  http::Pipe pipe;
+  http::Pipe::Writer writer = pipe.writer();
+  http::Pipe::Reader reader = pipe.reader();
+
+  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+      serialize, contentType, lambda::_1));
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+    attach->mutable_container_id()->CopyFrom(containerId);
+
+    writer.write(encoder.encode(call));
+  }
+
+  size_t offset = 0;
+  size_t chunkSize = 4096;
+  while (offset < data.length()) {
+    string dataChunk = data.substr(offset, chunkSize);
+    offset += chunkSize;
+
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+    v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+    processIO->set_type(v1::agent::ProcessIO::DATA);
+    processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+    processIO->mutable_data()->set_data(dataChunk);
+
+    writer.write(encoder.encode(call));
+  }
+
+  // Signal `EOF` to the 'cat' command.
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+    v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+    processIO->set_type(v1::agent::ProcessIO::DATA);
+    processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+    processIO->mutable_data()->set_data("");
+
+    writer.write(encoder.encode(call));
+  }
+
+  writer.close();
+
+  {
+    // TODO(anand): Add a `post()` overload that handles request streaming.
+    http::URL agent = http::URL(
+        "http",
+        slave.get()->pid.address.ip,
+        slave.get()->pid.address.port,
+        slave.get()->pid.id +
+        "/api/v1");
+
+    Future<http::Connection> _connection = http::connect(agent);
+    AWAIT_READY(_connection);
+
+    http::Connection connection = _connection.get(); // Remove const.
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+    headers["Content-Type"] = stringify(contentType);
+
+    http::Request request;
+    request.url = agent;
+    request.method = "POST";
+    request.type = http::Request::PIPE;
+    request.reader = reader;
+    request.headers = headers;
+
+    Future<http::Response> response = connection.send(request);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
+  ASSERT_SOME(sessionResponse->reader);
+
+  Option<http::Pipe::Reader> output = sessionResponse->reader.get();
+    ASSERT_SOME(output);
+
+  Future<tuple<string, string>> received =
+    getProcessIOData(contentType, output.get());
+
+  AWAIT_READY(received);
+
+
+  string stdoutReceived;
+  string stderrReceived;
+
+  tie(stdoutReceived, stderrReceived) = received.get();
+
+  // Verify the output matches what we sent.
+  ASSERT_TRUE(stderrReceived.empty());
+  ASSERT_EQ(data, stdoutReceived);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {