You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/02 05:55:17 UTC
[1/4] mesos git commit: Added API handler for
LAUNCH_NESTED_CONTAINER_SESSION.
Repository: mesos
Updated Branches:
refs/heads/master 800c629a8 -> 0e0fc463b
Added API handler for LAUNCH_NESTED_CONTAINER_SESSION.
In addition to launching the nested container the API handler
ensures that the container is destroyed if the connection breaks.
Review: https://reviews.apache.org/r/54196
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e0fc463
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e0fc463
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e0fc463
Branch: refs/heads/master
Commit: 0e0fc463bc18171db656e0b6892811d75644dbd1
Parents: ec009f4
Author: Vinod Kone <vi...@gmail.com>
Authored: Sun Nov 20 18:24:51 2016 +0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Dec 1 21:54:28 2016 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 175 +++++++++++++++++++++++++++++++++++++++++-
src/slave/slave.hpp | 8 ++
src/slave/validation.cpp | 26 ++++++-
src/tests/api_tests.cpp | 84 ++++++++++++++++++++
4 files changed, 287 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 5c300be..8b104ce 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -30,6 +30,8 @@
#include <mesos/executor/executor.hpp>
+#include <mesos/slave/containerizer.hpp>
+
#include <mesos/v1/agent/agent.hpp>
#include <mesos/v1/executor/executor.hpp>
@@ -71,6 +73,7 @@ using mesos::agent::ProcessIO;
using mesos::internal::recordio::Reader;
+using mesos::slave::ContainerClass;
using mesos::slave::ContainerTermination;
using process::AUTHENTICATION;
@@ -498,7 +501,8 @@ Future<Response> Slave::Http::_api(
return killNestedContainer(call, acceptType, principal);
case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
- return NotImplemented();
+ return launchNestedContainerSession(
+ call, contentType, acceptType, principal);
case mesos::agent::Call::ATTACH_CONTAINER_INPUT:
CHECK_SOME(reader);
@@ -2038,6 +2042,7 @@ Future<Response> Slave::Http::launchNestedContainer(
call.launch_nested_container().has_container()
? call.launch_nested_container().container()
: Option<ContainerInfo>::none(),
+ ContainerClass::DEFAULT,
acceptType,
approver);
}));
@@ -2048,6 +2053,7 @@ Future<Response> Slave::Http::_launchNestedContainer(
const ContainerID& containerId,
const CommandInfo& commandInfo,
const Option<ContainerInfo>& containerInfo,
+ const Option<ContainerClass>& containerClass,
ContentType acceptType,
const Owned<ObjectApprover>& approver) const
{
@@ -2111,7 +2117,8 @@ Future<Response> Slave::Http::_launchNestedContainer(
commandInfo,
containerInfo,
user,
- slave->info.id());
+ slave->info.id(),
+ containerClass);
// TODO(bmahler): The containerizers currently require that
// the caller calls destroy if the launch fails. See MESOS-6214.
@@ -2122,8 +2129,8 @@ Future<Response> Slave::Http::_launchNestedContainer(
slave->containerizer->destroy(containerId)
.onFailed([=](const string& failure) {
- LOG(ERROR) << "Failed to destroy nested container "
- << containerId << " after launch failure: " << failure;
+ LOG(ERROR) << "Failed to destroy nested container " << containerId
+ << " after launch failure: " << failure;
});
}));
@@ -2378,6 +2385,166 @@ Future<Response> Slave::Http::attachContainerInput(
}
+// Helper that reads data from `writer` and writes to `reader`.
+// Returns a failed future if there are any errors reading or writing.
+// The future is satisfied when we get a EOF.
+// TODO(vinod): Move this to libprocess if this is more generally useful.
+Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
+{
+ return reader.read()
+ .then([reader, writer](const Future<string>& chunk) mutable
+ -> Future<Nothing> {
+ if (!chunk.isReady()) {
+ return process::Failure(
+ chunk.isFailed() ? chunk.failure() : "discarded");
+ }
+
+ if (chunk->empty()) {
+ // EOF case.
+ return Nothing();
+ }
+
+ if (!writer.write(chunk.get())) {
+ return process::Failure("Write failed to the pipe");
+ }
+
+ return connect(reader, writer);
+ });
+}
+
+
+Future<Response> Slave::Http::launchNestedContainerSession(
+ const mesos::agent::Call& call,
+ ContentType contentType,
+ ContentType acceptType,
+ const Option<string>& principal) const
+{
+ CHECK_EQ(mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION, call.type());
+ CHECK(call.has_launch_nested_container_session());
+
+ const ContainerID& containerId =
+ call.launch_nested_container_session().container_id();
+
+ Future<Owned<ObjectApprover>> approver;
+
+ if (slave->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ approver = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+ } else {
+ approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ Future<Response> response = approver
+ .then(defer(slave->self(), [=](const Owned<ObjectApprover>& approver) {
+ return _launchNestedContainer(
+ call.launch_nested_container_session().container_id(),
+ call.launch_nested_container_session().command(),
+ call.launch_nested_container_session().has_container()
+ ? call.launch_nested_container_session().container()
+ : Option<ContainerInfo>::none(),
+ ContainerClass::DEBUG,
+ acceptType,
+ approver);
+ }));
+
+ // Helper to destroy the container.
+ auto destroy = [this](const ContainerID& containerId) {
+ slave->containerizer->destroy(containerId)
+ .onFailed([containerId](const string& failure) {
+ LOG(ERROR) << "Failed to destroy nested container "
+ << containerId << ": " << failure;
+ });
+ };
+
+ // If `response` has failed or is not `OK`, the container will be
+ // destroyed by `_launchNestedContainer`.
+ return response
+ .then(defer(slave->self(),
+ [=](const Response& response) -> Future<Response> {
+ if (response.status != OK().status) {
+ return response;
+ }
+
+ // If launch is successful, attach to the container output.
+ mesos::agent::Call call;
+ call.set_type(mesos::agent::Call::ATTACH_CONTAINER_OUTPUT);
+ call.mutable_attach_container_output()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ // Instead of directly returning the response of `attachContainerOutput`
+ // to the client, we use a level of indirection to make sure the container
+ // is destroyed when the client connection breaks.
+ return attachContainerOutput(call, contentType, acceptType, principal)
+ .then(defer(slave->self(),
+ [=](const Response& response) -> Future<Response> {
+ Pipe pipe;
+ Pipe::Writer writer = pipe.writer();
+
+ OK ok;
+ ok.headers["Content-Type"] = stringify(acceptType);
+ ok.type = Response::PIPE;
+ ok.reader = pipe.reader();
+
+ CHECK_EQ(Response::PIPE, response.type);
+ CHECK_SOME(response.reader);
+ Pipe::Reader reader = response.reader.get();
+
+ // Read from the `response` pipe and write to
+ // the client's response pipe.
+ // NOTE: Need to cast the lambda to std::function here because of a
+ // limitation of `defer`; `defer` does not work with `mutable` lambda.
+ std::function<void (const Future<Nothing>&)> _connect =
+ [=](const Future<Nothing>& future) mutable {
+ CHECK(!future.isDiscarded());
+
+ if (future.isFailed()) {
+ LOG(WARNING) << "Failed to send attach response for "
+ << containerId << ": " << future.failure();
+
+ writer.fail(future.failure());
+ reader.close();
+ } else {
+ // EOF case.
+ LOG(INFO) << "Received EOF attach response for " << containerId;
+
+ writer.close();
+ reader.close();
+ }
+
+ destroy(containerId);
+ };
+
+ connect(reader, writer)
+ .onAny(defer(slave->self(), _connect));
+
+ // Destroy the container if the connection to client is closed.
+ writer.readerClosed()
+ .onAny(defer(slave->self(), [=](const Future<Nothing>& future) {
+ LOG(WARNING)
+ << "Launch nested container session connection"
+ << " for container " << containerId << " closed"
+ << (future.isFailed() ? ": " + future.failure() : "");
+
+ destroy(containerId);
+ }));
+
+ return ok;
+ }))
+ .onFailed(defer(slave->self(), [=](const string& failure) {
+ LOG(WARNING) << "Failed to attach to nested container "
+ << containerId << ": " << failure;
+
+ destroy(containerId);
+ }));
+ }));
+}
+
+
Future<Response> Slave::Http::attachContainerOutput(
const mesos::agent::Call& call,
ContentType contentType,
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dacdbcf..4b94dff 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -38,6 +38,7 @@
#include <mesos/module/authenticatee.hpp>
+#include <mesos/slave/containerizer.hpp>
#include <mesos/slave/qos_controller.hpp>
#include <mesos/slave/resource_estimator.hpp>
@@ -644,6 +645,7 @@ private:
const ContainerID& containerId,
const CommandInfo& commandInfo,
const Option<ContainerInfo>& containerInfo,
+ const Option<mesos::slave::ContainerClass>& containerClass,
ContentType acceptType,
const Owned<ObjectApprover>& approver) const;
@@ -657,6 +659,12 @@ private:
ContentType acceptType,
const Option<std::string>& principal) const;
+ process::Future<process::http::Response> launchNestedContainerSession(
+ const mesos::agent::Call& call,
+ ContentType contentType,
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
+
process::Future<process::http::Response> attachContainerInput(
const mesos::agent::Call& call,
process::Owned<recordio::Reader<agent::Call>>&& decoder,
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index 4005cfc..15330ad 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -222,8 +222,30 @@ Option<Error> validate(
return None();
}
- case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
- return Error("Unsupported");
+ case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION: {
+ if (!call.has_launch_nested_container_session()) {
+ return Error(
+ "Expecting 'launch_nested_container_session' to be present");
+ }
+
+ Option<Error> error = validation::container::validateContainerId(
+ call.launch_nested_container_session().container_id());
+
+ if (error.isSome()) {
+ return Error("'launch_nested_container_session.container_id' is invalid"
+ ": " + error->message);
+ }
+
+ // The parent `ContainerID` is required, so that we know
+ // which container to place it underneath.
+ if (!call.launch_nested_container_session().container_id().has_parent()) {
+ return Error(
+ "Expecting 'launch_nested_container_session.container_id.parent'"
+ " to be present");
+ }
+
+ return None();
+ }
case mesos::agent::Call::ATTACH_CONTAINER_INPUT: {
if (!call.has_attach_container_input()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index ea6e037..afae6a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3611,6 +3611,90 @@ TEST_P(AgentAPITest, NestedContainerLaunch)
}
+// This test verifies that launch nested container session fails when
+// attaching to the output of the container fails. Consequently, the
+// launched container should be destroyed.
+TEST_P(AgentAPITest, LaunchNestedContainerSessionAttachFailure)
+{
+ ContentType contentType = GetParam();
+
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), &containerizer);
+
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ Future<Nothing> executorRegistered;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(FutureSatisfy(&executorRegistered));
+
+ EXPECT_CALL(exec, launchTask(_, _));
+
+ driver.start();
+
+ AWAIT_READY(executorRegistered);
+
+ Future<hashset<ContainerID>> containerIds = containerizer.containers();
+ AWAIT_READY(containerIds);
+ EXPECT_EQ(1u, containerIds->size());
+
+ v1::ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+ containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ call.mutable_launch_nested_container_session()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ Future<http::Response> response = http::streaming::post(
+ slave.get()->pid,
+ "api/v1",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ serialize(contentType, call),
+ stringify(contentType));
+
+ // Launch should fail because test containerizer doesn't support `attach`.
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response);
+
+ // Settle the clock here to ensure any pending callbacks are executed.
+ Clock::settle();
+
+ // Attach failure should result in the destruction of nested container.
+ containerIds = containerizer.containers();
+ AWAIT_READY(containerIds);
+ EXPECT_EQ(1u, containerIds->size());
+ EXPECT_FALSE(containerIds->contains(devolve(containerId)));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+}
+
+
// TODO(vinod): Update the test when mesos containerizer
// adds support for `attach`.
TEST_P(AgentAPITest, AttachContainerOutputFailure)
[4/4] mesos git commit: Updated signatures of v1 agent API handlers.
Posted by vi...@apache.org.
Updated signatures of v1 agent API handlers.
Moved the position of `contentType` argument in the handlers and
renamed it to `acceptType` to be consistent.
Review: https://reviews.apache.org/r/54245
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16765a3d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16765a3d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16765a3d
Branch: refs/heads/master
Commit: 16765a3df93284125e3bb9247b400475cfa7d1c6
Parents: 800c629
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Nov 30 22:07:17 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Dec 1 21:54:28 2016 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 166 +++++++++++++++++++++++------------------------
src/slave/slave.hpp | 64 +++++++++---------
2 files changed, 115 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/16765a3d/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index ace3575..029eead 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -450,52 +450,52 @@ Future<Response> Slave::Http::_api(
return NotImplemented();
case agent::Call::GET_HEALTH:
- return getHealth(call, principal, acceptType);
+ return getHealth(call, acceptType, principal);
case agent::Call::GET_FLAGS:
- return getFlags(call, principal, acceptType);
+ return getFlags(call, acceptType, principal);
case agent::Call::GET_VERSION:
- return getVersion(call, principal, acceptType);
+ return getVersion(call, acceptType, principal);
case agent::Call::GET_METRICS:
- return getMetrics(call, principal, acceptType);
+ return getMetrics(call, acceptType, principal);
case agent::Call::GET_LOGGING_LEVEL:
- return getLoggingLevel(call, principal, acceptType);
+ return getLoggingLevel(call, acceptType, principal);
case agent::Call::SET_LOGGING_LEVEL:
- return setLoggingLevel(call, principal, acceptType);
+ return setLoggingLevel(call, acceptType, principal);
case agent::Call::LIST_FILES:
- return listFiles(call, principal, acceptType);
+ return listFiles(call, acceptType, principal);
case agent::Call::READ_FILE:
- return readFile(call, principal, acceptType);
+ return readFile(call, acceptType, principal);
case agent::Call::GET_STATE:
- return getState(call, principal, acceptType);
+ return getState(call, acceptType, principal);
case agent::Call::GET_CONTAINERS:
- return getContainers(call, principal, acceptType);
+ return getContainers(call, acceptType, principal);
case agent::Call::GET_FRAMEWORKS:
- return getFrameworks(call, principal, acceptType);
+ return getFrameworks(call, acceptType, principal);
case agent::Call::GET_EXECUTORS:
- return getExecutors(call, principal, acceptType);
+ return getExecutors(call, acceptType, principal);
case agent::Call::GET_TASKS:
- return getTasks(call, principal, acceptType);
+ return getTasks(call, acceptType, principal);
case agent::Call::LAUNCH_NESTED_CONTAINER:
- return launchNestedContainer(call, principal, acceptType);
+ return launchNestedContainer(call, acceptType, principal);
case agent::Call::WAIT_NESTED_CONTAINER:
- return waitNestedContainer(call, principal, acceptType);
+ return waitNestedContainer(call, acceptType, principal);
case agent::Call::KILL_NESTED_CONTAINER:
- return killNestedContainer(call, principal, acceptType);
+ return killNestedContainer(call, acceptType, principal);
case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
return NotImplemented();
@@ -733,14 +733,14 @@ JSON::Object Slave::Http::_flags() const
Future<Response> Slave::Http::getFlags(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_FLAGS, call.type());
- return OK(serialize(contentType,
+ return OK(serialize(acceptType,
evolve<v1::agent::Response::GET_FLAGS>(_flags())),
- stringify(contentType));
+ stringify(acceptType));
}
@@ -764,8 +764,8 @@ Future<Response> Slave::Http::health(const Request& request) const
Future<Response> Slave::Http::getHealth(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_HEALTH, call.type());
@@ -773,28 +773,28 @@ Future<Response> Slave::Http::getHealth(
response.set_type(agent::Response::GET_HEALTH);
response.mutable_get_health()->set_healthy(true);
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}
Future<Response> Slave::Http::getVersion(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_VERSION, call.type());
- return OK(serialize(contentType,
+ return OK(serialize(acceptType,
evolve<v1::agent::Response::GET_VERSION>(version())),
- stringify(contentType));
+ stringify(acceptType));
}
Future<Response> Slave::Http::getMetrics(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_METRICS, call.type());
CHECK(call.has_get_metrics());
@@ -805,7 +805,7 @@ Future<Response> Slave::Http::getMetrics(
}
return process::metrics::snapshot(timeout)
- .then([contentType](const hashmap<string, double>& metrics) -> Response {
+ .then([acceptType](const hashmap<string, double>& metrics) -> Response {
agent::Response response;
response.set_type(agent::Response::GET_METRICS);
agent::Response::GetMetrics* _getMetrics =
@@ -817,16 +817,16 @@ Future<Response> Slave::Http::getMetrics(
metric->set_value(value);
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
});
}
Future<Response> Slave::Http::getLoggingLevel(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_LOGGING_LEVEL, call.type());
@@ -834,15 +834,15 @@ Future<Response> Slave::Http::getLoggingLevel(
response.set_type(agent::Response::GET_LOGGING_LEVEL);
response.mutable_get_logging_level()->set_level(FLAGS_v);
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}
Future<Response> Slave::Http::setLoggingLevel(
const agent::Call& call,
- const Option<string>& principal,
- ContentType /*contentType*/) const
+ ContentType /*contentType*/,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::SET_LOGGING_LEVEL, call.type());
CHECK(call.has_set_logging_level());
@@ -860,15 +860,15 @@ Future<Response> Slave::Http::setLoggingLevel(
Future<Response> Slave::Http::listFiles(
const mesos::agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(mesos::agent::Call::LIST_FILES, call.type());
const string& path = call.list_files().path();
return slave->files->browse(path, principal)
- .then([contentType](const Try<list<FileInfo>, FilesError>& result)
+ .then([acceptType](const Try<list<FileInfo>, FilesError>& result)
-> Future<Response> {
if (result.isError()) {
const FilesError& error = result.error();
@@ -900,8 +900,8 @@ Future<Response> Slave::Http::listFiles(
listFiles->add_file_infos()->CopyFrom(fileInfo);
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
});
}
@@ -1196,8 +1196,8 @@ Future<Response> Slave::Http::state(
Future<Response> Slave::Http::getFrameworks(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_FRAMEWORKS, call.type());
@@ -1219,15 +1219,15 @@ Future<Response> Slave::Http::getFrameworks(
return frameworksApprover
.then(defer(slave->self(),
- [this, contentType](const Owned<ObjectApprover>& frameworksApprover)
+ [this, acceptType](const Owned<ObjectApprover>& frameworksApprover)
-> Future<Response> {
agent::Response response;
response.set_type(agent::Response::GET_FRAMEWORKS);
response.mutable_get_frameworks()->CopyFrom(
_getFrameworks(frameworksApprover));
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}));
}
@@ -1262,8 +1262,8 @@ agent::Response::GetFrameworks Slave::Http::_getFrameworks(
Future<Response> Slave::Http::getExecutors(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_EXECUTORS, call.type());
@@ -1288,7 +1288,7 @@ Future<Response> Slave::Http::getExecutors(
return collect(frameworksApprover, executorsApprover)
.then(defer(slave->self(),
- [this, contentType](const tuple<Owned<ObjectApprover>,
+ [this, acceptType](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
-> Future<Response> {
// Get approver from tuple.
@@ -1302,8 +1302,8 @@ Future<Response> Slave::Http::getExecutors(
response.mutable_get_executors()->CopyFrom(
_getExecutors(frameworksApprover, executorsApprover));
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}));
}
@@ -1366,8 +1366,8 @@ agent::Response::GetExecutors Slave::Http::_getExecutors(
Future<Response> Slave::Http::getTasks(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_TASKS, call.type());
@@ -1397,7 +1397,7 @@ Future<Response> Slave::Http::getTasks(
return collect(frameworksApprover, tasksApprover, executorsApprover)
.then(defer(slave->self(),
- [this, contentType](const tuple<Owned<ObjectApprover>,
+ [this, acceptType](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
-> Future<Response> {
@@ -1415,8 +1415,8 @@ Future<Response> Slave::Http::getTasks(
tasksApprover,
executorsApprover));
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}));
}
@@ -1547,8 +1547,8 @@ agent::Response::GetTasks Slave::Http::_getTasks(
Future<Response> Slave::Http::getState(
const agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(agent::Call::GET_STATE, call.type());
@@ -1595,8 +1595,8 @@ Future<Response> Slave::Http::getState(
tasksApprover,
executorsApprover));
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
}));
}
@@ -1805,13 +1805,13 @@ Future<Response> Slave::Http::containers(
Future<Response> Slave::Http::getContainers(
const agent::Call& call,
- const Option<string>& printcipal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& printcipal) const
{
CHECK_EQ(agent::Call::GET_CONTAINERS, call.type());
return __containers()
- .then([contentType](const Future<JSON::Array>& result)
+ .then([acceptType](const Future<JSON::Array>& result)
-> Future<Response> {
if (!result.isReady()) {
LOG(WARNING) << "Could not collect container status and statistics: "
@@ -1825,9 +1825,9 @@ Future<Response> Slave::Http::getContainers(
return OK(
serialize(
- contentType,
+ acceptType,
evolve<v1::agent::Response::GET_CONTAINERS>(result.get())),
- stringify(contentType));
+ stringify(acceptType));
});
}
@@ -1960,8 +1960,8 @@ Try<string> Slave::Http::extractEndpoint(const process::http::URL& url) const
Future<Response> Slave::Http::readFile(
const mesos::agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(mesos::agent::Call::READ_FILE, call.type());
@@ -1974,7 +1974,7 @@ Future<Response> Slave::Http::readFile(
}
return slave->files->read(offset, length, path, principal)
- .then([contentType](const Try<tuple<size_t, string>, FilesError>& result)
+ .then([acceptType](const Try<tuple<size_t, string>, FilesError>& result)
-> Future<Response> {
if (result.isError()) {
const FilesError& error = result.error();
@@ -2002,16 +2002,16 @@ Future<Response> Slave::Http::readFile(
response.mutable_read_file()->set_size(std::get<0>(result.get()));
response.mutable_read_file()->set_data(std::get<1>(result.get()));
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
});
}
Future<Response> Slave::Http::launchNestedContainer(
const mesos::agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(mesos::agent::Call::LAUNCH_NESTED_CONTAINER, call.type());
CHECK(call.has_launch_nested_container());
@@ -2129,8 +2129,8 @@ Future<Response> Slave::Http::launchNestedContainer(
Future<Response> Slave::Http::waitNestedContainer(
const mesos::agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(mesos::agent::Call::WAIT_NESTED_CONTAINER, call.type());
CHECK(call.has_wait_nested_container());
@@ -2150,7 +2150,7 @@ Future<Response> Slave::Http::waitNestedContainer(
}
return approver.then(defer(slave->self(),
- [this, call, contentType](const Owned<ObjectApprover>& waitApprover)
+ [this, call, acceptType](const Owned<ObjectApprover>& waitApprover)
-> Future<Response> {
const ContainerID& containerId =
call.wait_nested_container().container_id();
@@ -2194,7 +2194,7 @@ Future<Response> Slave::Http::waitNestedContainer(
slave->containerizer->wait(containerId);
return wait
- .then([containerId, contentType](
+ .then([containerId, acceptType](
const Option<ContainerTermination>& termination) -> Response {
if (termination.isNone()) {
return NotFound("Container " + stringify(containerId) +
@@ -2211,8 +2211,8 @@ Future<Response> Slave::Http::waitNestedContainer(
waitNestedContainer->set_exit_status(termination->status());
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return OK(serialize(acceptType, evolve(response)),
+ stringify(acceptType));
});
}));
}
@@ -2220,8 +2220,8 @@ Future<Response> Slave::Http::waitNestedContainer(
Future<Response> Slave::Http::killNestedContainer(
const mesos::agent::Call& call,
- const Option<string>& principal,
- ContentType contentType) const
+ ContentType acceptType,
+ const Option<string>& principal) const
{
CHECK_EQ(mesos::agent::Call::KILL_NESTED_CONTAINER, call.type());
CHECK(call.has_kill_nested_container());
http://git-wip-us.apache.org/repos/asf/mesos/blob/16765a3d/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index cfec9dd..0d0e990 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -555,61 +555,61 @@ private:
process::Future<process::http::Response> getFlags(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getHealth(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getVersion(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getMetrics(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getLoggingLevel(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> setLoggingLevel(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> listFiles(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getContainers(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> readFile(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> getFrameworks(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
mesos::agent::Response::GetFrameworks _getFrameworks(
const process::Owned<ObjectApprover>& frameworksApprover) const;
process::Future<process::http::Response> getExecutors(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
mesos::agent::Response::GetExecutors _getExecutors(
const process::Owned<ObjectApprover>& frameworksApprover,
@@ -617,8 +617,8 @@ private:
process::Future<process::http::Response> getTasks(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
mesos::agent::Response::GetTasks _getTasks(
const process::Owned<ObjectApprover>& frameworksApprover,
@@ -627,8 +627,8 @@ private:
process::Future<process::http::Response> getState(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
mesos::agent::Response::GetState _getState(
const process::Owned<ObjectApprover>& frameworksApprover,
@@ -637,18 +637,18 @@ private:
process::Future<process::http::Response> launchNestedContainer(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> waitNestedContainer(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> killNestedContainer(
const mesos::agent::Call& call,
- const Option<std::string>& principal,
- ContentType contentType) const;
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
process::Future<process::http::Response> attachContainerInput(
const mesos::agent::Call& call,
[2/4] mesos git commit: Refactored
`Slave::Http::launchNestedContainer()`.
Posted by vi...@apache.org.
Refactored `Slave::Http::launchNestedContainer()`.
This so that code can be reused for `launchNestedContainerSession()`
that will be added later. No functional change.
Review: https://reviews.apache.org/r/54193
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9c89abf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9c89abf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9c89abf
Branch: refs/heads/master
Commit: e9c89abf9ff4d7c8b6b73d687d7248d738b9ac10
Parents: 16765a3
Author: Vinod Kone <vi...@gmail.com>
Authored: Sun Nov 20 16:07:31 2016 +0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Dec 1 21:54:28 2016 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 176 +++++++++++++++++++++++++----------------------
src/slave/slave.hpp | 7 ++
2 files changed, 100 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c89abf/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 029eead..5c300be 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -2030,100 +2030,110 @@ Future<Response> Slave::Http::launchNestedContainer(
approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
- return approver.then(defer(slave->self(),
- [this, call, contentType](const Owned<ObjectApprover>& launchApprover)
- -> Future<Response> {
- const ContainerID& containerId =
- call.launch_nested_container().container_id();
-
- // We do not yet support launching containers that are nested
- // two levels beneath the executor's container.
- if (containerId.parent().has_parent()) {
- return NotImplemented(
- "Only a single level of container nesting is supported currently,"
- " but 'launch_nested_container.container_id.parent.parent' is set");
- }
+ return approver
+ .then(defer(slave->self(), [=](const Owned<ObjectApprover>& approver) {
+ return _launchNestedContainer(
+ call.launch_nested_container().container_id(),
+ call.launch_nested_container().command(),
+ call.launch_nested_container().has_container()
+ ? call.launch_nested_container().container()
+ : Option<ContainerInfo>::none(),
+ acceptType,
+ approver);
+ }));
+}
- // Locate the executor (for now we just loop since we don't
- // index based on container id and this likely won't have a
- // significant performance impact due to the low number of
- // executors per-agent).
- // TODO(adam-mesos): Support more levels of nesting.
- Executor* executor = nullptr;
- Framework* framework = nullptr;
- foreachvalue (Framework* framework_, slave->frameworks) {
- foreachvalue (Executor* executor_, framework_->executors) {
- if (executor_->containerId == containerId.parent()) {
- framework = framework_;
- executor = executor_;
- break;
- }
- }
- }
- // Return a "Bad Request" here rather than "Not Found" since
- // the executor needs to set parent to its container id.
- if (executor == nullptr || framework == nullptr) {
- return BadRequest("Unable to locate executor for parent container"
- " " + stringify(containerId.parent()));
- }
+Future<Response> Slave::Http::_launchNestedContainer(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const Option<ContainerInfo>& containerInfo,
+ ContentType acceptType,
+ const Owned<ObjectApprover>& approver) const
+{
+ // We do not yet support launching containers that are nested
+ // two levels beneath the executor's container.
+ if (containerId.parent().has_parent()) {
+ return NotImplemented(
+ "Only a single level of container nesting is supported currently,"
+ " but 'launch_nested_container.container_id.parent.parent' is set");
+ }
- ObjectApprover::Object object;
- object.executor_info = &(executor->info);
- object.framework_info = &(framework->info);
- if (call.launch_nested_container().has_command()) {
- object.command_info = &(call.launch_nested_container().command());
+ // Locate the executor (for now we just loop since we don't
+ // index based on container id and this likely won't have a
+ // significant performance impact due to the low number of
+ // executors per-agent).
+ // TODO(adam-mesos): Support more levels of nesting.
+ Executor* executor = nullptr;
+ Framework* framework = nullptr;
+ foreachvalue (Framework* framework_, slave->frameworks) {
+ foreachvalue (Executor* executor_, framework_->executors) {
+ if (executor_->containerId == containerId.parent()) {
+ framework = framework_;
+ executor = executor_;
+ break;
}
+ }
+ }
- Try<bool> approved = launchApprover.get()->approved(object);
+ // Return a "Bad Request" here rather than "Not Found" since
+ // the executor needs to set parent to its container id.
+ if (executor == nullptr || framework == nullptr) {
+ return BadRequest("Unable to locate executor for parent container"
+ " " + stringify(containerId.parent()));
+ }
- if (approved.isError()) {
- return Failure(approved.error());
- } else if (!approved.get()) {
- return Forbidden();
- }
+ ObjectApprover::Object object;
+ object.executor_info = &(executor->info);
+ object.framework_info = &(framework->info);
+ object.command_info = &(commandInfo);
- // By default, we use the executor's user.
- // The command user overrides it if specified.
- Option<string> user = executor->user;
+ Try<bool> approved = approver.get()->approved(object);
+
+ if (approved.isError()) {
+ return Failure(approved.error());
+ } else if (!approved.get()) {
+ return Forbidden();
+ }
+
+ // By default, we use the executor's user.
+ // The command user overrides it if specified.
+ Option<string> user = executor->user;
#ifndef __WINDOWS__
- if (call.launch_nested_container().command().has_user()) {
- user = call.launch_nested_container().command().user();
- }
+ if (commandInfo.has_user()) {
+ user = commandInfo.user();
+ }
#endif
- Future<bool> launched = slave->containerizer->launch(
- containerId,
- call.launch_nested_container().command(),
- call.launch_nested_container().has_container()
- ? call.launch_nested_container().container()
- : Option<ContainerInfo>::none(),
- user,
- slave->info.id());
-
- // TODO(bmahler): The containerizers currently require that
- // the caller calls destroy if the launch fails. See MESOS-6214.
- launched
- .onFailed(defer(slave->self(), [=](const string& failure) {
- LOG(WARNING) << "Failed to launch nested container " << containerId
- << ": " << failure;
-
- slave->containerizer->destroy(containerId)
- .onFailed([=](const string& failure) {
- LOG(ERROR) << "Failed to destroy nested container "
- << containerId << " after launch failure: " << failure;
- });
- }));
-
- return launched
- .then([](bool launched) -> Response {
- if (!launched) {
- return BadRequest("The provided ContainerInfo is not supported");
- }
- return OK();
- });
+ Future<bool> launched = slave->containerizer->launch(
+ containerId,
+ commandInfo,
+ containerInfo,
+ user,
+ slave->info.id());
+
+ // TODO(bmahler): The containerizers currently require that
+ // the caller calls destroy if the launch fails. See MESOS-6214.
+ launched
+ .onFailed(defer(slave->self(), [=](const string& failure) {
+ LOG(WARNING) << "Failed to launch nested container " << containerId
+ << ": " << failure;
+
+ slave->containerizer->destroy(containerId)
+ .onFailed([=](const string& failure) {
+ LOG(ERROR) << "Failed to destroy nested container "
+ << containerId << " after launch failure: " << failure;
+ });
}));
+
+ return launched
+ .then([](bool launched) -> Response {
+ if (!launched) {
+ return BadRequest("The provided ContainerInfo is not supported");
+ }
+ return OK();
+ });
}
@@ -2241,7 +2251,7 @@ Future<Response> Slave::Http::killNestedContainer(
}
return approver.then(defer(slave->self(),
- [this, call, contentType](const Owned<ObjectApprover>& killApprover)
+ [this, call](const Owned<ObjectApprover>& killApprover)
-> Future<Response> {
const ContainerID& containerId =
call.kill_nested_container().container_id();
http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c89abf/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0d0e990..dacdbcf 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -640,6 +640,13 @@ private:
ContentType acceptType,
const Option<std::string>& principal) const;
+ process::Future<process::http::Response> _launchNestedContainer(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const Option<ContainerInfo>& containerInfo,
+ ContentType acceptType,
+ const Owned<ObjectApprover>& approver) const;
+
process::Future<process::http::Response> waitNestedContainer(
const mesos::agent::Call& call,
ContentType acceptType,
[3/4] mesos git commit: Added `devolve` helper for `ContainerID`.
Posted by vi...@apache.org.
Added `devolve` helper for `ContainerID`.
This will be used subsequently.
Review: https://reviews.apache.org/r/54194
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec009f45
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec009f45
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec009f45
Branch: refs/heads/master
Commit: ec009f45538b6ecc330984a3a85b5e2361f3e5aa
Parents: e9c89ab
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Nov 29 19:51:41 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Dec 1 21:54:28 2016 -0800
----------------------------------------------------------------------
src/internal/devolve.cpp | 6 ++++++
src/internal/devolve.hpp | 1 +
2 files changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ec009f45/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index a94bafa..a49171a 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -54,6 +54,12 @@ CommandInfo devolve(const v1::CommandInfo& command)
}
+ContainerID devolve(const v1::ContainerID& containerId)
+{
+ return devolve<ContainerID>(containerId);
+}
+
+
Credential devolve(const v1::Credential& credential)
{
return devolve<Credential>(credential);
http://git-wip-us.apache.org/repos/asf/mesos/blob/ec009f45/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 15347e1..6d9503d 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -46,6 +46,7 @@ namespace internal {
// Helpers for devolving types between versions. Please add as necessary!
CommandInfo devolve(const v1::CommandInfo& command);
+ContainerID devolve(const v1::ContainerID& containerId);
Credential devolve(const v1::Credential& credential);
ExecutorID devolve(const v1::ExecutorID& executorId);
FrameworkID devolve(const v1::FrameworkID& frameworkId);