You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/15 22:06:59 UTC
[3/8] mesos git commit: Supported getting all containers in the agent
API.
Supported getting all containers in the agent API.
Review: https://reviews.apache.org/r/64639
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7de21b1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7de21b1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7de21b1c
Branch: refs/heads/master
Commit: 7de21b1ccaf0da00eba42a4a4a687ba2ef6f07b4
Parents: a814362
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Dec 15 08:37:05 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 15 13:56:43 2017 -0800
----------------------------------------------------------------------
src/internal/evolve.cpp | 18 +-
src/slave/containerizer/mesos/containerizer.cpp | 4 -
src/slave/http.cpp | 278 +++++++++++++------
src/slave/http.hpp | 5 +-
src/tests/agent_container_api_tests.cpp | 48 ++++
5 files changed, 258 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 6ce6150..7758c9b 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -685,19 +685,25 @@ v1::agent::Response evolve<v1::agent::Response::GET_CONTAINERS>(
Result<JSON::String> framework_id =
object.find<JSON::String>("framework_id");
- CHECK_SOME(framework_id);
- container->mutable_framework_id()->set_value(framework_id.get().value);
+ CHECK(!framework_id.isError());
+ if (framework_id.isSome()) {
+ container->mutable_framework_id()->set_value(framework_id.get().value);
+ }
Result<JSON::String> executor_id = object.find<JSON::String>("executor_id");
- CHECK_SOME(executor_id);
- container->mutable_executor_id()->set_value(executor_id.get().value);
+ CHECK(!executor_id.isError());
+ if (executor_id.isSome()) {
+ container->mutable_executor_id()->set_value(executor_id.get().value);
+ }
Result<JSON::String> executor_name =
object.find<JSON::String>("executor_name");
- CHECK_SOME(executor_name);
- container->set_executor_name(executor_name.get().value);
+ CHECK(!executor_name.isError());
+ if (executor_name.isSome()) {
+ container->set_executor_name(executor_name.get().value);
+ }
Result<JSON::Object> container_status = object.find<JSON::Object>("status");
if (container_status.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 616873e..1a398a8 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -2194,8 +2194,6 @@ Future<ResourceStatistics> _usage(
const Option<Resources>& resources,
const list<Future<ResourceStatistics>>& statistics)
{
- CHECK(!containerId.has_parent());
-
ResourceStatistics result;
// Set the timestamp now we have all statistics.
@@ -2232,8 +2230,6 @@ Future<ResourceStatistics> _usage(
Future<ResourceStatistics> MesosContainerizerProcess::usage(
const ContainerID& containerId)
{
- CHECK(!containerId.has_parent());
-
if (!containers_.contains(containerId)) {
return Failure("Unknown container " + stringify(containerId));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index ed22b9f..3e20c90 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -72,6 +72,8 @@
#include "slave/slave.hpp"
#include "slave/validation.hpp"
+#include "slave/containerizer/mesos/paths.hpp"
+
#include "version/version.hpp"
using mesos::agent::ProcessIO;
@@ -2166,10 +2168,26 @@ Future<Response> Http::getContainers(
AuthorizationAcceptor::create(
principal, slave->authorizer, authorization::VIEW_CONTAINER);
- return authorizeContainer.then(defer(slave->self(),
- [this](const Owned<AuthorizationAcceptor>& authorizeContainer) {
- // Use an empty container ID filter.
- return __containers(authorizeContainer, None());
+ Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer =
+ AuthorizationAcceptor::create(
+ principal, slave->authorizer, authorization::VIEW_STANDALONE_CONTAINER);
+
+ return collect(authorizeContainer, authorizeStandaloneContainer)
+ .then(defer(
+ slave->self(),
+ [this, call](const tuple<Owned<AuthorizationAcceptor>,
+ Owned<AuthorizationAcceptor>>& acceptors) {
+ Owned<AuthorizationAcceptor> authorizeContainer;
+ Owned<AuthorizationAcceptor> authorizeStandaloneContainer;
+ tie(authorizeContainer, authorizeStandaloneContainer) = acceptors;
+
+ // Use an empty container ID filter.
+ return __containers(
+ authorizeContainer,
+ authorizeStandaloneContainer,
+ None(),
+ call.get_containers().show_nested(),
+ call.get_containers().show_standalone());
})).then([acceptType](const Future<JSON::Array>& result)
-> Future<Response> {
if (!result.isReady()) {
@@ -2198,19 +2216,36 @@ Future<Response> Http::_containers(
Future<Owned<AuthorizationAcceptor>> authorizeContainer =
AuthorizationAcceptor::create(
principal, slave->authorizer, authorization::VIEW_CONTAINER);
+
+ Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer =
+ AuthorizationAcceptor::create(
+ principal, slave->authorizer, authorization::VIEW_STANDALONE_CONTAINER);
+
Future<IDAcceptor<ContainerID>> selectContainerId =
IDAcceptor<ContainerID>(request.url.query.get("container_id"));
- return collect(authorizeContainer, selectContainerId)
+ return collect(authorizeContainer,
+ authorizeStandaloneContainer,
+ selectContainerId)
.then(defer(
slave->self(),
[this](const tuple<Owned<AuthorizationAcceptor>,
+ Owned<AuthorizationAcceptor>,
IDAcceptor<ContainerID>>& acceptors) {
Owned<AuthorizationAcceptor> authorizeContainer;
+ Owned<AuthorizationAcceptor> authorizeStandaloneContainer;
Option<IDAcceptor<ContainerID>> selectContainerId;
- tie(authorizeContainer, selectContainerId) = acceptors;
- return __containers(authorizeContainer, selectContainerId);
+ tie(authorizeContainer,
+ authorizeStandaloneContainer,
+ selectContainerId) = acceptors;
+
+ return __containers(
+ authorizeContainer,
+ authorizeStandaloneContainer,
+ selectContainerId,
+ false,
+ false);
})).then([request](const Future<JSON::Array>& result) -> Future<Response> {
if (!result.isReady()) {
LOG(WARNING) << "Could not collect container status and statistics: "
@@ -2231,96 +2266,171 @@ Future<Response> Http::_containers(
Future<JSON::Array> Http::__containers(
Owned<AuthorizationAcceptor> authorizeContainer,
- Option<IDAcceptor<ContainerID>> selectContainerId) const
+ Owned<AuthorizationAcceptor> authorizeStandaloneContainer,
+ Option<IDAcceptor<ContainerID>> selectContainerId,
+ bool showNestedContainers,
+ bool showStandaloneContainers) const
{
- Owned<list<JSON::Object>> metadata(new list<JSON::Object>());
- list<Future<ContainerStatus>> statusFutures;
- list<Future<ResourceStatistics>> statsFutures;
+ return slave->containerizer->containers()
+ .then(defer(slave->self(), [=](const hashset<ContainerID> containerIds) {
+ Owned<list<JSON::Object>> metadata(new list<JSON::Object>());
+ list<Future<ContainerStatus>> statusFutures;
+ list<Future<ResourceStatistics>> statsFutures;
+
+ hashset<ContainerID> executorContainerIds;
+ hashset<ContainerID> authorizedExecutorContainerIds;
+
+ foreachvalue (const Framework* framework, slave->frameworks) {
+ foreachvalue (const Executor* executor, framework->executors) {
+ // No need to get statistics and status if we know that the
+ // executor has already terminated.
+ if (executor->state == Executor::TERMINATED) {
+ continue;
+ }
- foreachvalue (const Framework* framework, slave->frameworks) {
- foreachvalue (const Executor* executor, framework->executors) {
- // No need to get statistics and status if we know that the
- // executor has already terminated.
- if (executor->state == Executor::TERMINATED) {
- continue;
- }
+ const ExecutorInfo& info = executor->info;
+ const ContainerID& containerId = executor->containerId;
- const ExecutorInfo& info = executor->info;
- const ContainerID& containerId = executor->containerId;
+ executorContainerIds.insert(containerId);
- if ((selectContainerId.isSome() &&
- !selectContainerId->accept(containerId)) ||
- !authorizeContainer->accept(info, framework->info)) {
- continue;
+ if ((selectContainerId.isSome() &&
+ !selectContainerId->accept(containerId)) ||
+ !authorizeContainer->accept(info, framework->info)) {
+ continue;
+ }
+
+ authorizedExecutorContainerIds.insert(containerId);
+
+ JSON::Object entry;
+ entry.values["framework_id"] = info.framework_id().value();
+ entry.values["executor_id"] = info.executor_id().value();
+ entry.values["executor_name"] = info.name();
+ entry.values["source"] = info.source();
+ entry.values["container_id"] = containerId.value();
+
+ metadata->push_back(entry);
+ statusFutures.push_back(slave->containerizer->status(containerId));
+ statsFutures.push_back(slave->containerizer->usage(containerId));
+ }
}
- JSON::Object entry;
- entry.values["framework_id"] = info.framework_id().value();
- entry.values["executor_id"] = info.executor_id().value();
- entry.values["executor_name"] = info.name();
- entry.values["source"] = info.source();
- entry.values["container_id"] = containerId.value();
+ foreach (const ContainerID& containerId, containerIds) {
+ if (executorContainerIds.contains(containerId)) {
+ continue;
+ }
- metadata->push_back(entry);
- statusFutures.push_back(slave->containerizer->status(containerId));
- statsFutures.push_back(slave->containerizer->usage(containerId));
- }
- }
+ if (selectContainerId.isSome() &&
+ !selectContainerId->accept(containerId)) {
+ continue;
+ }
- return await(await(statusFutures), await(statsFutures)).then(
- [metadata](const tuple<
- Future<list<Future<ContainerStatus>>>,
- Future<list<Future<ResourceStatistics>>>>& t)
- -> Future<JSON::Array> {
- const list<Future<ContainerStatus>>& status = std::get<0>(t).get();
- const list<Future<ResourceStatistics>>& stats = std::get<1>(t).get();
- CHECK_EQ(status.size(), stats.size());
- CHECK_EQ(status.size(), metadata->size());
-
- JSON::Array result;
-
- auto statusIter = status.begin();
- auto statsIter = stats.begin();
- auto metadataIter = metadata->begin();
-
- while (statusIter != status.end() &&
- statsIter != stats.end() &&
- metadataIter != metadata->end()) {
- JSON::Object& entry = *metadataIter;
-
- if (statusIter->isReady()) {
- entry.values["status"] = JSON::protobuf(statusIter->get());
- } else {
- LOG(WARNING) << "Failed to get container status for executor '"
- << entry.values["executor_id"] << "'"
- << " of framework "
- << entry.values["framework_id"] << ": "
- << (statusIter->isFailed()
- ? statusIter->failure()
- : "discarded");
- }
+ const bool isNestedContainer = containerId.has_parent();
- if (statsIter->isReady()) {
- entry.values["statistics"] = JSON::protobuf(statsIter->get());
- } else {
- LOG(WARNING) << "Failed to get resource statistics for executor '"
- << entry.values["executor_id"] << "'"
- << " of framework "
- << entry.values["framework_id"] << ": "
- << (statsIter->isFailed()
- ? statsIter->failure()
- : "discarded");
- }
+ // TODO(jieyu): Only MesosContainerizer supports standalone
+ // container currently. Thus it's ok to call
+ // MesosContainerizer-specific method here. If we want to
+ // support other Containerizers, we should make this a
+ // Containerizer interface.
+ const bool isStandaloneContainer =
+ containerizer::paths::isStandaloneContainer(
+ slave->flags.runtime_dir,
+ containerId);
+
+ // For nested containers, authorization is always based on
+ // its root container.
+ ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
- result.values.push_back(entry);
+ const bool isRootContainerStandalone =
+ containerizer::paths::isStandaloneContainer(
+ slave->flags.runtime_dir,
+ rootContainerId);
- statusIter++;
- statsIter++;
- metadataIter++;
+ if (isNestedContainer && !showNestedContainers) {
+ continue;
}
- return result;
- });
+ if (isStandaloneContainer && !showStandaloneContainers) {
+ continue;
+ }
+
+ if (isRootContainerStandalone &&
+ !authorizeStandaloneContainer->accept()) {
+ continue;
+ }
+
+ if (!isRootContainerStandalone &&
+ !authorizedExecutorContainerIds.contains(rootContainerId)) {
+ continue;
+ }
+
+ JSON::Object entry;
+ entry.values["container_id"] = containerId.value();
+
+ metadata->push_back(entry);
+ statusFutures.push_back(slave->containerizer->status(containerId));
+ statsFutures.push_back(slave->containerizer->usage(containerId));
+ }
+
+ return await(await(statusFutures), await(statsFutures)).then(
+ [metadata](const tuple<
+ Future<list<Future<ContainerStatus>>>,
+ Future<list<Future<ResourceStatistics>>>>& t)
+ -> Future<JSON::Array> {
+ const list<Future<ContainerStatus>>& status =
+ std::get<0>(t).get();
+
+ const list<Future<ResourceStatistics>>& stats =
+ std::get<1>(t).get();
+
+ CHECK_EQ(status.size(), stats.size());
+ CHECK_EQ(status.size(), metadata->size());
+
+ JSON::Array result;
+
+ auto statusIter = status.begin();
+ auto statsIter = stats.begin();
+ auto metadataIter = metadata->begin();
+
+ while (statusIter != status.end() &&
+ statsIter != stats.end() &&
+ metadataIter != metadata->end()) {
+ JSON::Object& entry = *metadataIter;
+
+ if (statusIter->isReady()) {
+ entry.values["status"] = JSON::protobuf(statusIter->get());
+ } else {
+ LOG(WARNING) << "Failed to get container status for executor '"
+ << entry.values["executor_id"] << "'"
+ << " of framework "
+ << entry.values["framework_id"] << ": "
+ << (statusIter->isFailed()
+ ? statusIter->failure()
+ : "discarded");
+ }
+
+ if (statsIter->isReady()) {
+ entry.values["statistics"] = JSON::protobuf(statsIter->get());
+ } else {
+ LOG(WARNING)
+ << "Failed to get resource statistics for executor '"
+ << entry.values["executor_id"] << "'"
+ << " of framework "
+ << entry.values["framework_id"] << ": "
+ << (statsIter->isFailed()
+ ? statsIter->failure()
+ : "discarded");
+ }
+
+ result.values.push_back(entry);
+
+ statusIter++;
+ statsIter++;
+ metadataIter++;
+ }
+
+ return result;
+ });
+ }));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/http.hpp
----------------------------------------------------------------------
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 9ca0617..3cdbf98 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -117,7 +117,10 @@ private:
// Helper function to collect containers status and resource statistics.
process::Future<JSON::Array> __containers(
process::Owned<AuthorizationAcceptor> authorizeContainer,
- Option<IDAcceptor<ContainerID>> selectContainerId) const;
+ process::Owned<AuthorizationAcceptor> authorizeStandaloneContainer,
+ Option<IDAcceptor<ContainerID>> selectContainerId,
+ bool showNestedContainers,
+ bool showStandaloneContainers) const;
// Helper routines for endpoint authorization.
Try<std::string> extractEndpoint(const process::http::URL& url) const;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/tests/agent_container_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/agent_container_api_tests.cpp b/src/tests/agent_container_api_tests.cpp
index c2c15eb..6185692 100644
--- a/src/tests/agent_container_api_tests.cpp
+++ b/src/tests/agent_container_api_tests.cpp
@@ -421,6 +421,17 @@ public:
return post(slave, call);
}
+ Future<http::Response> getContainers(
+ const process::PID<slave::Slave>& slave)
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::GET_CONTAINERS);
+ call.mutable_get_containers()->set_show_nested(true);
+ call.mutable_get_containers()->set_show_standalone(true);
+
+ return post(slave, call);
+ }
+
protected:
virtual void TearDown()
{
@@ -836,6 +847,43 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
EXPECT_TRUE(checkWaitContainerResponse(wait, SIGKILL));
}
+
+// This test verifies the GET_CONTAINERS API call.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentContainerAPITest, GetContainers)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.launcher = std::get<1>(std::get<3>(GetParam()));
+ slaveFlags.isolation = std::get<0>(std::get<3>(GetParam()));
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ Try<v1::ContainerID> parentContainerId =
+ launchParentContainer(master.get()->pid, slave.get()->pid);
+
+ ASSERT_SOME(parentContainerId);
+
+ // Launch a nested container and wait for it to finish.
+ v1::ContainerID containerId;
+ containerId.set_value(id::UUID::random().toString());
+ containerId.mutable_parent()->CopyFrom(parentContainerId.get());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ http::OK().status,
+ launchNestedContainer(slave.get()->pid, containerId));
+
+ Future<v1::agent::Response> response =
+ deserialize(getContainers(slave.get()->pid));
+
+ ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, response->type());
+ EXPECT_EQ(2, response->get_containers().containers_size());
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {