You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/06/11 20:36:21 UTC
[2/6] mesos git commit: Refactored the ResourceMonitor to get
statistics from the Slave.
Refactored the ResourceMonitor to get statistics from the Slave.
Review: https://reviews.apache.org/r/35260
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8274c5e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8274c5e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8274c5e9
Branch: refs/heads/master
Commit: 8274c5e9ef46e297337999760e741d93710832f8
Parents: eaa992a
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 9 11:51:22 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:01 2015 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 22 +-
include/mesos/slave/resource_estimator.hpp | 3 +-
src/slave/monitor.cpp | 296 ++++++++----------------
src/slave/monitor.hpp | 99 +-------
src/slave/resource_estimators/fixed.cpp | 16 +-
src/slave/resource_estimators/noop.cpp | 6 +-
src/slave/resource_estimators/noop.hpp | 3 +-
src/slave/slave.cpp | 121 +++++-----
src/slave/slave.hpp | 3 +
src/tests/mesos.hpp | 4 +-
src/tests/monitor_tests.cpp | 137 ++++-------
11 files changed, 236 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 7457ff1..2034009 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -620,14 +620,24 @@ message ResourceStatistics {
/**
- * Describes a snapshot of the resource usage for an executor.
+ * Describes a snapshot of the resource usage for executors.
*/
message ResourceUsage {
- // Source of the collected statistics.
- optional ExecutorInfo executor_info = 1;
- // Current resource usage.
- // If missing, the isolation module cannot provide resource usage.
- optional ResourceStatistics statistics = 2;
+ message Executor {
+ required ExecutorInfo executor_info = 1;
+
+ // This includes resources used by the executor itself
+ // as well as its active tasks.
+ repeated Resource allocated = 2;
+
+ // Current resource usage. If absent, the containerizer
+ // cannot provide resource usage.
+ optional ResourceStatistics statistics = 3;
+ }
+
+ repeated Executor executors = 1;
+
+ // TODO(jieyu): Include slave's total resources here.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 7f78fd8..731ec3a 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -52,8 +52,7 @@ public:
// resource estimator to fetch the current resource usage for each
// executor on slave.
virtual Try<Nothing> initialize(
- const lambda::function<
- process::Future<std::list<ResourceUsage>>()>& usages) = 0;
+ const lambda::function<process::Future<ResourceUsage>()>& usage) = 0;
// Returns the current estimation about the *maximum* amount of
// resources that can be oversubscribed on the slave. A new
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 8f7ff63..82aa659 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -16,256 +16,156 @@
* limitations under the License.
*/
-#include <list>
-#include <map>
#include <string>
-#include <mesos/mesos.hpp>
+#include <glog/logging.h>
-#include <process/clock.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
-#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/help.hpp>
#include <process/http.hpp>
+#include <process/limiter.hpp>
#include <process/process.hpp>
-#include <process/statistics.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>
-#include "slave/containerizer/containerizer.hpp"
#include "slave/monitor.hpp"
using namespace process;
-using std::list;
-using std::make_pair;
-using std::map;
using std::string;
namespace mesos {
namespace internal {
namespace slave {
-using process::wait; // Necessary on some OS's to disambiguate.
-
-
-Future<Nothing> ResourceMonitorProcess::start(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo)
+static const string STATISTICS_HELP()
{
- if (monitored.contains(containerId)) {
- return Failure("Already monitored");
- }
-
- monitored[containerId] = executorInfo;
-
- return Nothing();
+ return HELP(
+ TLDR(
+ "Retrieve resource monitoring information."),
+ USAGE(
+ "/statistics.json"),
+ DESCRIPTION(
+ "Returns the current resource consumption data for containers",
+ "running under this slave.",
+ "",
+ "Example:",
+ "",
+ "```",
+ "[{",
+ " \"executor_id\":\"executor\",",
+ " \"executor_name\":\"name\",",
+ " \"framework_id\":\"framework\",",
+ " \"source\":\"source\",",
+ " \"statistics\":",
+ " {",
+ " \"cpus_limit\":8.25,",
+ " \"cpus_nr_periods\":769021,",
+ " \"cpus_nr_throttled\":1046,",
+ " \"cpus_system_time_secs\":34501.45,",
+ " \"cpus_throttled_time_secs\":352.597023453,",
+ " \"cpus_user_time_secs\":96348.84,",
+ " \"mem_anon_bytes\":4845449216,",
+ " \"mem_file_bytes\":260165632,",
+ " \"mem_limit_bytes\":7650410496,",
+ " \"mem_mapped_file_bytes\":7159808,",
+ " \"mem_rss_bytes\":5105614848,",
+ " \"timestamp\":1388534400.0",
+ " }",
+ "}]",
+ "```"));
}
-Future<Nothing> ResourceMonitorProcess::stop(
- const ContainerID& containerId)
+class ResourceMonitorProcess : public Process<ResourceMonitorProcess>
{
- if (!monitored.contains(containerId)) {
- return Failure("Not monitored");
+public:
+ explicit ResourceMonitorProcess(
+ const lambda::function<Future<ResourceUsage>()>& _usage)
+ : ProcessBase("monitor"),
+ usage(_usage),
+ limiter(2, Seconds(1)) {} // 2 permits per second.
+
+ virtual ~ResourceMonitorProcess() {}
+
+protected:
+ virtual void initialize()
+ {
+ route("/statistics.json",
+ STATISTICS_HELP(),
+ &ResourceMonitorProcess::statistics);
}
- monitored.erase(containerId);
-
- return Nothing();
-}
-
-
-Future<list<ResourceUsage>> ResourceMonitorProcess::usages()
-{
- list<Future<ResourceUsage>> futures;
-
- foreachkey (const ContainerID& containerId, monitored) {
- futures.push_back(usage(containerId));
+private:
+ // Returns the monitoring statistics. Requests have no parameters.
+ Future<http::Response> statistics(const http::Request& request)
+ {
+ return limiter.acquire()
+ .then(defer(self(), &Self::_statistics, request));
}
- return await(futures)
- .then(defer(self(), &ResourceMonitorProcess::_usages, lambda::_1));
-}
-
-
-list<ResourceUsage> ResourceMonitorProcess::_usages(
- list<Future<ResourceUsage>> futures)
-{
- list<ResourceUsage> result;
- foreach(const Future<ResourceUsage>& future, futures) {
- if (future.isReady()) {
- result.push_back(future.get());
- }
+ Future<http::Response> _statistics(const http::Request& request)
+ {
+ return usage()
+ .then(defer(self(), &Self::__statistics, lambda::_1, request));
}
- return result;
-}
-
-
-Future<ResourceUsage> ResourceMonitorProcess::usage(
- ContainerID containerId)
-{
- if (!monitored.contains(containerId)) {
- return Failure("Not monitored");
- }
-
- ExecutorInfo executorInfo = monitored[containerId];
-
- return containerizer->usage(containerId)
- .then(defer(
- self(),
- &ResourceMonitorProcess::_usage,
- executorInfo,
- lambda::_1))
- .onFailed([containerId, executorInfo](const string& failure) {
- LOG(WARNING) << "Failed to get resource usage for "
- << " container " << containerId
- << " for executor " << executorInfo.executor_id()
- << " of framework " << executorInfo.framework_id()
- << ": " << failure;
- })
- .onDiscarded([containerId, executorInfo]() {
- LOG(WARNING) << "Failed to get resource usage for "
- << " container " << containerId
- << " for executor " << executorInfo.executor_id()
- << " of framework " << executorInfo.framework_id()
- << ": future discarded";
- });
-}
-
-
-ResourceUsage ResourceMonitorProcess::_usage(
- const ExecutorInfo& executorInfo,
- const ResourceStatistics& statistics)
-{
- ResourceUsage usage;
- usage.mutable_executor_info()->CopyFrom(executorInfo);
- usage.mutable_statistics()->CopyFrom(statistics);
-
- return usage;
-}
-
-
-Future<http::Response> ResourceMonitorProcess::statistics(
- const http::Request& request)
-{
- return limiter.acquire()
- .then(defer(self(), &Self::_statistics, request));
-}
-
+ Future<http::Response> __statistics(
+ const Future<ResourceUsage>& future,
+ const http::Request& request)
+ {
+ if (!future.isReady()) {
+ LOG(WARNING) << "Could not collect resource usage: "
+ << (future.isFailed() ? future.failure() : "discarded");
-Future<http::Response> ResourceMonitorProcess::_statistics(
- const http::Request& request)
-{
- return usages()
- .then(defer(self(), &Self::__statistics, lambda::_1, request));
-}
+ return http::InternalServerError();
+ }
+ JSON::Array result;
-Future<http::Response> ResourceMonitorProcess::__statistics(
- const Future<list<ResourceUsage>>& futures,
- const http::Request& request)
-{
- if (!futures.isReady()) {
- LOG(WARNING) << "Could not collect usage statistics";
- return http::InternalServerError();
- }
+ foreach (const ResourceUsage::Executor& executor,
+ future.get().executors()) {
+ if (executor.has_statistics()) {
+ const ExecutorInfo info = executor.executor_info();
- JSON::Array result;
+ JSON::Object entry;
+ entry.values["framework_id"] = info.framework_id().value();
+ entry.values["executor_id"] = info.executor_id().value();
+ entry.values["executor_name"] = info.name();
+ entry.values["source"] = info.source();
+ entry.values["statistics"] = JSON::Protobuf(executor.statistics());
- foreach (const ResourceUsage& usage, futures.get()) {
- JSON::Object entry;
- entry.values["framework_id"] = usage.executor_info().framework_id().value();
- entry.values["executor_id"] = usage.executor_info().executor_id().value();
- entry.values["executor_name"] = usage.executor_info().name();
- entry.values["source"] = usage.executor_info().source();
- entry.values["statistics"] = JSON::Protobuf(usage.statistics());
+ result.values.push_back(entry);
+ }
+ }
- result.values.push_back(entry);
+ return http::OK(result, request.query.get("jsonp"));
}
- return http::OK(result, request.query.get("jsonp"));
-}
+ // Callback used to retrieve resource usage information from slave.
+ const lambda::function<Future<ResourceUsage>()> usage;
+ // Used to rate limit the statistics.json endpoint.
+ RateLimiter limiter;
+};
-const string ResourceMonitorProcess::STATISTICS_HELP = HELP(
- TLDR(
- "Retrieve resource monitoring information."),
- USAGE(
- "/statistics.json"),
- DESCRIPTION(
- "Returns the current resource consumption data for containers",
- "running under this slave.",
- "",
- "Example:",
- "",
- "```",
- "[{",
- " \"executor_id\":\"executor\",",
- " \"executor_name\":\"name\",",
- " \"framework_id\":\"framework\",",
- " \"source\":\"source\",",
- " \"statistics\":",
- " {",
- " \"cpus_limit\":8.25,",
- " \"cpus_nr_periods\":769021,",
- " \"cpus_nr_throttled\":1046,",
- " \"cpus_system_time_secs\":34501.45,",
- " \"cpus_throttled_time_secs\":352.597023453,",
- " \"cpus_user_time_secs\":96348.84,",
- " \"mem_anon_bytes\":4845449216,",
- " \"mem_file_bytes\":260165632,",
- " \"mem_limit_bytes\":7650410496,",
- " \"mem_mapped_file_bytes\":7159808,",
- " \"mem_rss_bytes\":5105614848,",
- " \"timestamp\":1388534400.0",
- " }",
- "}]",
- "```"));
-
-ResourceMonitor::ResourceMonitor(Containerizer* containerizer)
+ResourceMonitor::ResourceMonitor(
+ const lambda::function<Future<ResourceUsage>()>& usage)
+ : process(new ResourceMonitorProcess(usage))
{
- process = new ResourceMonitorProcess(containerizer);
- spawn(process);
+ spawn(process.get());
}
ResourceMonitor::~ResourceMonitor()
{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Nothing> ResourceMonitor::start(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo)
-{
- return dispatch(
- process,
- &ResourceMonitorProcess::start,
- containerId,
- executorInfo);
-}
-
-
-Future<Nothing> ResourceMonitor::stop(
- const ContainerID& containerId)
-{
- return dispatch(process, &ResourceMonitorProcess::stop, containerId);
-}
-
-
-Future<list<ResourceUsage>> ResourceMonitor::usages()
-{
- return dispatch(process, &ResourceMonitorProcess::usages);
+ terminate(process.get());
+ wait(process.get());
}
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 6a51eee..0dff109 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -19,119 +19,32 @@
#ifndef __SLAVE_MONITOR_HPP__
#define __SLAVE_MONITOR_HPP__
-#include <map>
-#include <string>
-
-#include <boost/circular_buffer.hpp>
-
#include <mesos/mesos.hpp>
-#include <mesos/type_utils.hpp>
#include <process/future.hpp>
-#include <process/limiter.hpp>
#include <process/owned.hpp>
-#include <process/statistics.hpp>
-#include <stout/cache.hpp>
-#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
+#include <stout/lambda.hpp>
namespace mesos {
namespace internal {
namespace slave {
// Forward declarations.
-class Containerizer;
class ResourceMonitorProcess;
-// Provides resource monitoring for containers. Usage information is
-// also exported via a JSON endpoint.
-// TODO(bmahler): Forward usage information to the master.
+// Exposes resources usage information via a JSON endpoint.
class ResourceMonitor
{
public:
- explicit ResourceMonitor(Containerizer* containerizer);
- ~ResourceMonitor();
-
- // Starts monitoring resources for the given container.
- // Returns a failure if the container is already being watched.
- process::Future<Nothing> start(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo);
-
- // Stops monitoring resources for the given container.
- // Returns a failure if the container is unknown to the monitor.
- process::Future<Nothing> stop(
- const ContainerID& containerId);
-
- process::Future<std::list<ResourceUsage>> usages();
-
-private:
- ResourceMonitorProcess* process;
-};
-
-
-class ResourceMonitorProcess : public process::Process<ResourceMonitorProcess>
-{
-public:
- explicit ResourceMonitorProcess(Containerizer* _containerizer)
- : ProcessBase("monitor"),
- containerizer(_containerizer),
- limiter(2, Seconds(1)) {} // 2 permits per second.
+ explicit ResourceMonitor(
+ const lambda::function<process::Future<ResourceUsage>()>& usage);
- virtual ~ResourceMonitorProcess() {}
-
- process::Future<Nothing> start(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo);
-
- process::Future<Nothing> stop(
- const ContainerID& containerId);
-
- process::Future<std::list<ResourceUsage>> usages();
-
-protected:
- virtual void initialize()
- {
- route("/statistics.json",
- STATISTICS_HELP,
- &ResourceMonitorProcess::statistics);
- }
+ ~ResourceMonitor();
private:
- // Helper for returning the usage for a particular executor.
- process::Future<ResourceUsage> usage(ContainerID containerId);
-
- ResourceUsage _usage(
- const ExecutorInfo& executorInfo,
- const ResourceStatistics& statistics);
-
- std::list<ResourceUsage> _usages(
- std::list<process::Future<ResourceUsage>> future);
-
- // HTTP Endpoints.
- // Returns the monitoring statistics. Requests have no parameters.
- process::Future<process::http::Response> statistics(
- const process::http::Request& request);
- process::Future<process::http::Response> _statistics(
- const process::http::Request& request);
- process::Future<process::http::Response> __statistics(
- const process::Future<std::list<ResourceUsage>>& futures,
- const process::http::Request& request);
-
- static const std::string STATISTICS_HELP;
-
- Containerizer* containerizer;
-
- // Used to rate limit the statistics.json endpoint.
- process::RateLimiter limiter;
-
- // The executor info is stored for each monitored container.
- hashmap<ContainerID, ExecutorInfo> monitored;
+ process::Owned<ResourceMonitorProcess> process;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index 3efa18d..08a712f 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -16,10 +16,6 @@
* limitations under the License.
*/
-#include <list>
-
-#include <mesos/resources.hpp>
-
#include <mesos/module/resource_estimator.hpp>
#include <mesos/slave/resource_estimator.hpp>
@@ -37,17 +33,15 @@ using mesos::modules::Module;
using mesos::slave::ResourceEstimator;
-using std::list;
-
class FixedResourceEstimatorProcess
: public Process<FixedResourceEstimatorProcess>
{
public:
FixedResourceEstimatorProcess(
- const lambda::function<Future<list<ResourceUsage>>()>& _usages,
+ const lambda::function<Future<ResourceUsage>()>& _usage,
const Resources& _resources)
- : usages(_usages),
+ : usage(_usage),
resources(_resources) {}
Future<Resources> oversubscribable()
@@ -57,7 +51,7 @@ public:
}
protected:
- const lambda::function<Future<list<ResourceUsage>>()>& usages;
+ const lambda::function<Future<ResourceUsage>()>& usage;
const Resources resources;
};
@@ -83,13 +77,13 @@ public:
}
virtual Try<Nothing> initialize(
- const lambda::function<Future<list<ResourceUsage>>()>& usages)
+ const lambda::function<Future<ResourceUsage>()>& usage)
{
if (process.get() != NULL) {
return Error("Fixed resource estimator has already been initialized");
}
- process.reset(new FixedResourceEstimatorProcess(usages, resources));
+ process.reset(new FixedResourceEstimatorProcess(usage, resources));
spawn(process.get());
return Nothing();
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/noop.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/noop.cpp b/src/slave/resource_estimators/noop.cpp
index 5f135ff..e450b44 100644
--- a/src/slave/resource_estimators/noop.cpp
+++ b/src/slave/resource_estimators/noop.cpp
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-#include <list>
-
#include <process/dispatch.hpp>
#include <process/process.hpp>
@@ -27,8 +25,6 @@
using namespace process;
-using std::list;
-
namespace mesos {
namespace internal {
namespace slave {
@@ -54,7 +50,7 @@ NoopResourceEstimator::~NoopResourceEstimator()
Try<Nothing> NoopResourceEstimator::initialize(
- const lambda::function<Future<list<ResourceUsage>>()>& usages)
+ const lambda::function<Future<ResourceUsage>()>& usage)
{
if (process.get() != NULL) {
return Error("Noop resource estimator has already been initialized");
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/noop.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/noop.hpp b/src/slave/resource_estimators/noop.hpp
index 7a44e6d..46f64c7 100644
--- a/src/slave/resource_estimators/noop.hpp
+++ b/src/slave/resource_estimators/noop.hpp
@@ -42,8 +42,7 @@ public:
virtual ~NoopResourceEstimator();
virtual Try<Nothing> initialize(
- const lambda::function<
- process::Future<std::list<ResourceUsage>>()>& usages);
+ const lambda::function<process::Future<ResourceUsage>()>& usage);
virtual process::Future<Resources> oversubscribable();
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0a2cd16..5f00e6e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -36,6 +36,7 @@
#include <process/async.hpp>
#include <process/check.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
@@ -130,7 +131,7 @@ Slave::Slave(const slave::Flags& _flags,
files(_files),
metrics(*this),
gc(_gc),
- monitor(containerizer),
+ monitor(defer(self(), &Self::usage)),
statusUpdateManager(_statusUpdateManager),
metaDir(paths::getMetaRootDir(flags.work_dir)),
recoveryErrors(0),
@@ -327,7 +328,7 @@ void Slave::initialize()
}
Try<Nothing> initialize = resourceEstimator->initialize(
- lambda::bind(&ResourceMonitor::usages, &monitor));
+ defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the resource estimator: "
@@ -2327,21 +2328,6 @@ void Slave::registerExecutor(
}
-void _monitor(
- const Future<Nothing>& monitor,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const ContainerID& containerId)
-{
- if (!monitor.isReady()) {
- LOG(ERROR) << "Failed to monitor container '" << containerId
- << "' for executor '" << executorId
- << "' of framework '" << frameworkId
- << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
- }
-}
-
-
void Slave::reregisterExecutor(
const UPID& from,
const FrameworkID& frameworkId,
@@ -2428,18 +2414,6 @@ void Slave::reregisterExecutor(
executorId,
executor->containerId));
- // Monitor the executor.
- // TODO(jieyu): Do not start the monitor if the containerizer
- // update fails.
- monitor.start(
- executor->containerId,
- executor->info)
- .onAny(lambda::bind(_monitor,
- lambda::_1,
- framework->id(),
- executor->id,
- executor->containerId));
-
hashmap<TaskID, TaskInfo> unackedTasks;
foreach (const TaskInfo& task, tasks) {
unackedTasks[task.task_id()] = task;
@@ -3208,18 +3182,6 @@ void Slave::executorLaunched(
break;
case Executor::REGISTERING:
case Executor::RUNNING:
- LOG(INFO) << "Monitoring executor '" << executorId
- << "' of framework '" << frameworkId
- << "' in container '" << containerId << "'";
- // Start monitoring the container's resources.
- monitor.start(
- containerId,
- executor->info)
- .onAny(lambda::bind(_monitor,
- lambda::_1,
- frameworkId,
- executorId,
- containerId));
break;
case Executor::TERMINATED:
default:
@@ -3231,12 +3193,6 @@ void Slave::executorLaunched(
}
-void _unmonitor(
- const Future<Nothing>& watch,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId);
-
-
// Called by the isolator when an executor process terminates.
void Slave::executorTerminated(
const FrameworkID& frameworkId,
@@ -3298,10 +3254,6 @@ void Slave::executorTerminated(
executor->state = Executor::TERMINATED;
- // Stop monitoring the executor's container.
- monitor.stop(executor->containerId)
- .onAny(lambda::bind(_unmonitor, lambda::_1, frameworkId, executorId));
-
// Transition all live tasks to TASK_LOST/TASK_FAILED.
// If the containerizer killed the executor (e.g., due to OOM event)
// or if this is a command executor, we send TASK_FAILED status updates
@@ -3506,19 +3458,6 @@ void Slave::removeFramework(Framework* framework)
}
-void _unmonitor(
- const Future<Nothing>& unmonitor,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId)
-{
- if (!unmonitor.isReady()) {
- LOG(ERROR) << "Failed to unmonitor container for executor " << executorId
- << " of framework " << frameworkId << ": "
- << (unmonitor.isFailed() ? unmonitor.failure() : "discarded");
- }
-}
-
-
void Slave::shutdownExecutor(
const UPID& from,
const FrameworkID& frameworkId,
@@ -4196,6 +4135,54 @@ void Slave::qosCorrections(
}
+Future<ResourceUsage> Slave::usage()
+{
+ // NOTE: We use 'Owned' here trying to avoid the expensive copy.
+ // C++11 lambda only supports capturing variables that have copy
+ // constructors. Revisit once we remove the copy constructor for
+ // Owned (or C++14 lambda generalized capture is supported).
+ Owned<ResourceUsage> usage(new ResourceUsage());
+ list<Future<ResourceStatistics>> futures;
+
+ foreachvalue (const Framework* framework, frameworks) {
+ foreachvalue (const Executor* executor, framework->executors) {
+ ResourceUsage::Executor* entry = usage->add_executors();
+ entry->mutable_executor_info()->CopyFrom(executor->info);
+ entry->mutable_allocated()->CopyFrom(executor->resources);
+
+ futures.push_back(containerizer->usage(executor->containerId));
+ }
+ }
+
+ return await(futures).then(
+ [usage](const list<Future<ResourceStatistics>>& futures)
+ -> Future<ResourceUsage> {
+ // NOTE: We add ResourceUsage::Executor to 'usage' the same
+ // order as we push future to 'futures'. So the variables
+ // 'future' and 'executor' below should be in sync.
+ CHECK_EQ(futures.size(), usage->executors_size());
+
+ size_t i = 0;
+ foreach (const Future<ResourceStatistics>& future, futures) {
+ ResourceUsage::Executor* executor = usage->mutable_executors(i++);
+
+ if (future.isReady()) {
+ executor->mutable_statistics()->CopyFrom(future.get());
+ } else {
+ LOG(WARNING) << "Failed to get resource statistics for executor '"
+ << executor->executor_info().executor_id() << "'"
+ << " of framework "
+ << executor->executor_info().framework_id() << ": "
+ << (future.isFailed() ? future.failure()
+ : "discarded");
+ }
+ }
+
+ return *usage;
+ });
+}
+
+
// TODO(dhamon): Move these to their own metrics.hpp|cpp.
double Slave::_tasks_staging()
{
@@ -4465,9 +4452,9 @@ Executor* Framework::launchExecutor(
const TaskInfo& taskInfo)
{
// Generate an ID for the executor's container.
- // TODO(idownes) This should be done by the containerizer but we need the
- // ContainerID to create the executor's directory and to set up monitoring.
- // Fix this when 'launchExecutor()' is handled asynchronously.
+ // TODO(idownes) This should be done by the containerizer but we
+ // need the ContainerID to create the executor's directory. Fix
+ // this when 'launchExecutor()' is handled asynchronously.
ContainerID containerId;
containerId.set_value(UUID::random().toString());
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 98c64f6..0df1b55 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -358,6 +358,9 @@ public:
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction);
+ // Returns the resource usage information for all executors.
+ process::Future<ResourceUsage> usage();
+
private:
void _authenticate();
void authenticationTimeout(process::Future<bool> future);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 50b0061..e19ef98 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -738,9 +738,7 @@ public:
MOCK_METHOD1(
initialize,
- Try<Nothing>(
- const lambda::function<
- process::Future<std::list<ResourceUsage>>()>&));
+ Try<Nothing>(const lambda::function<process::Future<ResourceUsage>()>&));
MOCK_METHOD0(
oversubscribable,
http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 6de8b1f..197c153 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -17,13 +17,10 @@
*/
#include <limits>
-#include <map>
-
-#include <gmock/gmock.h>
#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
-#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
@@ -33,32 +30,19 @@
#include <stout/nothing.hpp>
-#include "slave/constants.hpp"
#include "slave/monitor.hpp"
-#include "tests/containerizer.hpp"
-#include "tests/mesos.hpp"
-
-using process::Clock;
-using process::Future;
+using namespace process;
-using process::http::BadRequest;
-using process::http::NotFound;
-using process::http::OK;
-using process::http::Response;
+using mesos::internal::slave::ResourceMonitor;
using std::numeric_limits;
using std::string;
-using testing::_;
-using testing::DoAll;
-using testing::Return;
-
namespace mesos {
namespace internal {
namespace tests {
-
TEST(MonitorTest, Statistics)
{
FrameworkID frameworkId;
@@ -67,9 +51,6 @@ TEST(MonitorTest, Statistics)
ExecutorID executorId;
executorId.set_value("executor");
- ContainerID containerId;
- containerId.set_value("container");
-
ExecutorInfo executorInfo;
executorInfo.mutable_executor_id()->CopyFrom(executorId);
executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
@@ -90,37 +71,24 @@ TEST(MonitorTest, Statistics)
statistics.set_mem_limit_bytes(2048);
statistics.set_timestamp(0);
- TestContainerizer containerizer;
-
- Future<Nothing> usage;
- EXPECT_CALL(containerizer, usage(containerId))
- .WillOnce(DoAll(FutureSatisfy(&usage),
- Return(statistics)));
-
- slave::ResourceMonitor monitor(&containerizer);
-
- // We pause the clock first to ensure unexpected collections
- // are avoided.
- process::Clock::pause();
+ ResourceMonitor monitor([=]() -> Future<ResourceUsage> {
+ Resources resources = Resources::parse("cpus:1;mem:2").get();
- monitor.start(
- containerId,
- executorInfo);
+ ResourceUsage usage;
+ ResourceUsage::Executor* executor = usage.add_executors();
+ executor->mutable_executor_info()->CopyFrom(executorInfo);
+ executor->mutable_allocated()->CopyFrom(resources);
+ executor->mutable_statistics()->CopyFrom(statistics);
- // Now wait for ResouorceMonitorProcess::watch to finish.
- process::Clock::settle();
+ return usage;
+ });
- process::UPID upid("monitor", process::address());
-
- // Request the statistics, this will ask the isolator.
- Future<Response> response = process::http::get(upid, "statistics.json");
+ UPID upid("monitor", address());
+ Future<http::Response> response = http::get(upid, "statistics.json");
AWAIT_READY(response);
- // The collection should have occurred on the isolator.
- ASSERT_TRUE(usage.isReady());
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
"application/json",
"Content-Type",
@@ -164,24 +132,23 @@ TEST(MonitorTest, Statistics)
statistics.mem_rss_bytes(),
statistics.timestamp()).get(),
response);
+}
- // Ensure the monitor stops polling the isolator.
- monitor.stop(containerId);
-
- // Wait until ResourceMonitorProcess::stop has completed.
- process::Clock::settle();
- // This time, Containerizer::usage should not get called.
- EXPECT_CALL(containerizer, usage(containerId))
- .Times(0);
+// This test verifies the correct handling of the statistics.json
+// endpoint when there is no executor running.
+TEST(MonitorTest, NoExecutor)
+{
+ ResourceMonitor monitor([]() -> Future<ResourceUsage> {
+ return ResourceUsage();
+ });
- response = process::http::get(upid, "statistics.json");
+ UPID upid("monitor", address());
- // Ensure the rate limiter acquires its permit.
- process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
- process::Clock::settle();
+ Future<http::Response> response = http::get(upid, "statistics.json");
+ AWAIT_READY(response);
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
"application/json",
"Content-Type",
@@ -190,43 +157,39 @@ TEST(MonitorTest, Statistics)
}
-// Test for correct handling of the statistics.json endpoint when
-// monitoring of a container is stopped.
-TEST(MonitorTest, UsageFailure)
+// This test verifies the correct handling of the statistics.json
+// endpoint when statistics is missing in ResourceUsage.
+TEST(MonitorTest, MissingStatistics)
{
- TestContainerizer containerizer;
-
- // Test containerizer is set up to:
- // 1) Synchronize test with Containerizer::usage()
- // 2) After that, stop monitoring the container.
- Future<Nothing> usage;
- process::Promise<ResourceStatistics> failPromise;
- EXPECT_CALL(containerizer, usage(DEFAULT_CONTAINER_ID))
- .WillOnce(DoAll(FutureSatisfy(&usage),
- Return(failPromise.future())));
+ ResourceMonitor monitor([]() -> Future<ResourceUsage> {
+ FrameworkID frameworkId;
+ frameworkId.set_value("framework");
- slave::ResourceMonitor monitor(&containerizer);
+ ExecutorID executorId;
+ executorId.set_value("executor");
- AWAIT_READY(monitor.start(DEFAULT_CONTAINER_ID, DEFAULT_EXECUTOR_INFO));
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+ executorInfo.set_name("name");
+ executorInfo.set_source("source");
- // Induce a call to usage().
- process::UPID upid("monitor", process::address());
- Future<Response> response = process::http::get(upid, "statistics.json");
+ Resources resources = Resources::parse("cpus:1;mem:2").get();
- // Usage was called, but Future<ResourceStatistics> is still
- // unsatisfied and monitor is blocked.
- AWAIT_READY(usage);
+ ResourceUsage usage;
+ ResourceUsage::Executor* executor = usage.add_executors();
+ executor->mutable_executor_info()->CopyFrom(executorInfo);
+ executor->mutable_allocated()->CopyFrom(resources);
- // Stop monitoring the container.
- AWAIT_READY(monitor.stop(DEFAULT_CONTAINER_ID));
+ return usage;
+ });
- // Fail the future to the collected container statistic.
- failPromise.set(process::Failure("Injected failure"));
+ UPID upid("monitor", address());
- // Verify an empty response.
+ Future<http::Response> response = http::get(upid, "statistics.json");
AWAIT_READY(response);
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
"application/json",
"Content-Type",