You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/11 04:49:24 UTC
mesos git commit: Added agent API test for attaching input to a
nested container session.
Repository: mesos
Updated Branches:
refs/heads/master e60d231b5 -> f5c69c4e1
Added agent API test for attaching input to a nested container session.
Review: https://reviews.apache.org/r/54632/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f5c69c4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f5c69c4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f5c69c4e
Branch: refs/heads/master
Commit: f5c69c4e199d4aa939ef7bcb9b86f049273e501b
Parents: e60d231
Author: Kevin Klues <kl...@gmail.com>
Authored: Sat Dec 10 20:49:09 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Dec 10 20:49:09 2016 -0800
----------------------------------------------------------------------
src/tests/api_tests.cpp | 219 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 219 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f5c69c4e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 7a22011..0988048 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -4222,6 +4222,225 @@ TEST_P(AgentAPIStreamingTest, AttachContainerInput)
ASSERT_EQ(data, stdoutReceived);
}
+
+// This test launches a nested container session with 'cat' as its
+// entrypoint and verifies that any data streamed to the container via
+// an ATTACH_CONTAINER_INPUT call is received by the client on the
+// output stream.
+TEST_P(AgentAPIStreamingTest, AttachInputToNestedContainerSession)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), containerizer.get(), flags);
+
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ TaskInfo taskInfo = createTask(offer, "sleep 1000");
+
+ driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
+
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_RUNNING, status->state());
+
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+ EXPECT_EQ(1u, containerIds->size());
+
+ v1::ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+ containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+ ContentType contentType = GetParam();
+
+ Future<http::Response> sessionResponse;
+
+ // Start a new LAUNCH_NESTED_CONTAINER_SESSION with `cat` as the
+ // command being launched.
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ call.mutable_launch_nested_container_session()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ call.mutable_launch_nested_container_session()->mutable_command()
+ ->CopyFrom(v1::createCommandInfo("cat"));
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ sessionResponse = http::streaming::post(
+ slave.get()->pid,
+ "api/v1",
+ headers,
+ serialize(ContentType::PROTOBUF, call),
+ stringify(ContentType::PROTOBUF));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
+ }
+
+ // Prepare the data to send to `cat` and send it over an
+ // `ATTACH_CONTAINER_INPUT` stream.
+ string data =
+ "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+ "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+ "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+ "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+ "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+ "culpa qui officia deserunt mollit anim id est laborum.";
+
+ while (Bytes(data.size()) < Megabytes(1)) {
+ data.append(data);
+ }
+
+ http::Pipe pipe;
+ http::Pipe::Writer writer = pipe.writer();
+ http::Pipe::Reader reader = pipe.reader();
+
+ ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+ serialize, contentType, lambda::_1));
+
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+ v1::agent::Call::AttachContainerInput* attach =
+ call.mutable_attach_container_input();
+
+ attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+ attach->mutable_container_id()->CopyFrom(containerId);
+
+ writer.write(encoder.encode(call));
+ }
+
+ size_t offset = 0;
+ size_t chunkSize = 4096;
+ while (offset < data.length()) {
+ string dataChunk = data.substr(offset, chunkSize);
+ offset += chunkSize;
+
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+ v1::agent::Call::AttachContainerInput* attach =
+ call.mutable_attach_container_input();
+
+ attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+ v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+ processIO->set_type(v1::agent::ProcessIO::DATA);
+ processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+ processIO->mutable_data()->set_data(dataChunk);
+
+ writer.write(encoder.encode(call));
+ }
+
+ // Signal `EOF` to the 'cat' command.
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+ v1::agent::Call::AttachContainerInput* attach =
+ call.mutable_attach_container_input();
+
+ attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+ v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+ processIO->set_type(v1::agent::ProcessIO::DATA);
+ processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+ processIO->mutable_data()->set_data("");
+
+ writer.write(encoder.encode(call));
+ }
+
+ writer.close();
+
+ {
+ // TODO(anand): Add a `post()` overload that handles request streaming.
+ http::URL agent = http::URL(
+ "http",
+ slave.get()->pid.address.ip,
+ slave.get()->pid.address.port,
+ slave.get()->pid.id +
+ "/api/v1");
+
+ Future<http::Connection> _connection = http::connect(agent);
+ AWAIT_READY(_connection);
+
+ http::Connection connection = _connection.get(); // Remove const.
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+ headers["Content-Type"] = stringify(contentType);
+
+ http::Request request;
+ request.url = agent;
+ request.method = "POST";
+ request.type = http::Request::PIPE;
+ request.reader = reader;
+ request.headers = headers;
+
+ Future<http::Response> response = connection.send(request);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ }
+
+ // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
+ ASSERT_SOME(sessionResponse->reader);
+
+ Option<http::Pipe::Reader> output = sessionResponse->reader.get();
+ ASSERT_SOME(output);
+
+ Future<tuple<string, string>> received =
+ getProcessIOData(contentType, output.get());
+
+ AWAIT_READY(received);
+
+
+ string stdoutReceived;
+ string stderrReceived;
+
+ tie(stdoutReceived, stderrReceived) = received.get();
+
+ // Verify the output matches what we sent.
+ ASSERT_TRUE(stderrReceived.empty());
+ ASSERT_EQ(data, stdoutReceived);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {