You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/05/14 01:43:58 UTC
[2/2] mesos git commit: Removed unused collect cycle in resource
monitor.
Removed unused collect cycle in resource monitor.
Review: https://reviews.apache.org/r/33875
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/791b7247
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/791b7247
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/791b7247
Branch: refs/heads/master
Commit: 791b7247f3f973f87575bede0c9ffdafa1ed4967
Parents: 2c6d487
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Wed May 13 16:07:30 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Wed May 13 16:42:26 2015 -0700
----------------------------------------------------------------------
src/slave/flags.hpp | 3 ++
src/slave/monitor.cpp | 98 +++-------------------------------
src/slave/monitor.hpp | 57 +++-----------------
src/slave/slave.cpp | 6 +--
src/tests/monitor_tests.cpp | 112 +--------------------------------------
5 files changed, 19 insertions(+), 257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index d3b1ce1..4c50be3 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -55,6 +55,9 @@ public:
Duration gc_delay;
double gc_disk_headroom;
Duration disk_watch_interval;
+
+ // TODO(nnielsen): Deprecate resource_monitoring_interval flag after
+ // Mesos 0.23.0.
Duration resource_monitoring_interval;
std::string recover;
http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 398af01..ba31206 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -53,29 +53,15 @@ namespace slave {
using process::wait; // Necessary on some OS's to disambiguate.
-// TODO(bmahler): Consider exposing these as flags should the
-// need arise. These are conservative for the initial version.
-const Duration MONITORING_TIME_SERIES_WINDOW = Weeks(2);
-const size_t MONITORING_TIME_SERIES_CAPACITY = 1000;
-const size_t MONITORING_ARCHIVED_TIME_SERIES = 25;
-
-
Future<Nothing> ResourceMonitorProcess::start(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const Duration& interval)
+ const ExecutorInfo& executorInfo)
{
if (monitored.contains(containerId)) {
return Failure("Already monitored");
}
- monitored[containerId] =
- MonitoringInfo(executorInfo,
- MONITORING_TIME_SERIES_WINDOW,
- MONITORING_TIME_SERIES_CAPACITY);
-
- // Schedule the resource collection.
- delay(interval, self(), &Self::collect, containerId, interval);
+ monitored[containerId] = executorInfo;
return Nothing();
}
@@ -88,82 +74,12 @@ Future<Nothing> ResourceMonitorProcess::stop(
return Failure("Not monitored");
}
- // Add the monitoring information to the archive.
- archive.push_back(
- process::Owned<MonitoringInfo>(
- new MonitoringInfo(monitored[containerId])));
monitored.erase(containerId);
return Nothing();
}
-void ResourceMonitorProcess::collect(
- const ContainerID& containerId,
- const Duration& interval)
-{
- // Has monitoring stopped?
- if (!monitored.contains(containerId)) {
- return;
- }
-
- containerizer->usage(containerId)
- .onAny(defer(self(),
- &Self::_collect,
- lambda::_1,
- containerId,
- interval));
-}
-
-
-void ResourceMonitorProcess::_collect(
- const Future<ResourceStatistics>& statistics,
- const ContainerID& containerId,
- const Duration& interval)
-{
- // Has monitoring been stopped?
- if (!monitored.contains(containerId)) {
- return;
- }
-
- const ExecutorID& executorId =
- monitored[containerId].executorInfo.executor_id();
- const FrameworkID& frameworkId =
- monitored[containerId].executorInfo.framework_id();
-
- if (statistics.isDiscarded()) {
- VLOG(1) << "Ignoring discarded future collecting resource usage for"
- << " container '" << containerId
- << "' for executor '" << executorId
- << "' of framework '" << frameworkId << "'";
- } else if (statistics.isFailed()) {
- // TODO(bmahler): Have the Containerizer discard the result when the
- // executor was killed or completed.
- VLOG(1)
- << "Failed to collect resource usage for"
- << " container '" << containerId
- << "' for executor '" << executorId
- << "' of framework '" << frameworkId << "': " << statistics.failure();
- } else {
- Try<Time> time = Time::create(statistics.get().timestamp());
-
- if (time.isError()) {
- LOG(ERROR) << "Invalid timestamp " << statistics.get().timestamp()
- << " for container '" << containerId
- << "' for executor '" << executorId
- << "' of framework '" << frameworkId << ": " << time.error();
- } else {
- // Add the statistics to the time series.
- monitored[containerId].statistics.set(
- statistics.get(), time.get());
- }
- }
-
- // Schedule the next collection.
- delay(interval, self(), &Self::collect, containerId, interval);
-}
-
-
ResourceMonitorProcess::Usage ResourceMonitorProcess::usage(
const ContainerID& containerId,
const ExecutorInfo& executorInfo)
@@ -192,10 +108,10 @@ Future<http::Response> ResourceMonitorProcess::_statistics(
list<Future<ResourceStatistics> > futures;
foreachpair (const ContainerID& containerId,
- const MonitoringInfo& info,
+ const ExecutorInfo& info,
monitored) {
// TODO(bmahler): Consider a batch usage API on the Containerizer.
- usages.push_back(usage(containerId, info.executorInfo));
+ usages.push_back(usage(containerId, info));
futures.push_back(usages.back().statistics);
}
@@ -289,15 +205,13 @@ ResourceMonitor::~ResourceMonitor()
Future<Nothing> ResourceMonitor::start(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const Duration& interval)
+ const ExecutorInfo& executorInfo)
{
return dispatch(
process,
&ResourceMonitorProcess::start,
containerId,
- executorInfo,
- interval);
+ executorInfo);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 69c60a1..0d09274 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -51,21 +51,10 @@ class ResourceMonitorProcess;
const extern Duration MONITORING_TIME_SERIES_WINDOW;
const extern size_t MONITORING_TIME_SERIES_CAPACITY;
-// Number of time series to maintain for completed executors.
-const extern size_t MONITORING_ARCHIVED_TIME_SERIES;
-
-// Provides resource monitoring for containers. Resource usage time
-// series are stored using the Statistics module. Usage information
-// is also exported via a JSON endpoint.
+// Provides resource monitoring for containers. Usage information is
+// also exported via a JSON endpoint.
// TODO(bmahler): Forward usage information to the master.
-// TODO(bmahler): Consider pulling out the resource collection into
-// a Collector abstraction. The monitor can then become a true
-// monitoring abstraction, allowing isolators to subscribe
-// to resource usage events. (e.g. get a future for the executor
-// hitting 75% memory consumption, the future would become ready
-// when this occurs, and the isolator can discard the future
-// when no longer interested).
class ResourceMonitor
{
public:
@@ -76,8 +65,7 @@ public:
// Returns a failure if the container is already being watched.
process::Future<Nothing> start(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const Duration& interval);
+ const ExecutorInfo& executorInfo);
// Stops monitoring resources for the given container.
// Returns a failure if the container is unknown to the monitor.
@@ -95,15 +83,13 @@ public:
explicit ResourceMonitorProcess(Containerizer* _containerizer)
: ProcessBase("monitor"),
containerizer(_containerizer),
- limiter(2, Seconds(1)), // 2 permits per second.
- archive(MONITORING_ARCHIVED_TIME_SERIES) {}
+ limiter(2, Seconds(1)) {} // 2 permits per second.
virtual ~ResourceMonitorProcess() {}
process::Future<Nothing> start(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const Duration& interval);
+ const ExecutorInfo& executorInfo);
process::Future<Nothing> stop(
const ContainerID& containerId);
@@ -114,21 +100,9 @@ protected:
route("/statistics.json",
STATISTICS_HELP,
&ResourceMonitorProcess::statistics);
-
- // TODO(bmahler): Add a archive.json endpoint that exposes
- // historical information, once we have path parameters for
- // routes.
}
private:
- void collect(
- const ContainerID& containerId,
- const Duration& interval);
- void _collect(
- const process::Future<ResourceStatistics>& statistics,
- const ContainerID& containerId,
- const Duration& interval);
-
// This is a convenience struct for bundling usage information.
struct Usage
{
@@ -159,25 +133,8 @@ private:
// Used to rate limit the statistics.json endpoint.
process::RateLimiter limiter;
- // Monitoring information for an executor.
- struct MonitoringInfo {
- // boost::circular_buffer needs a default constructor.
- MonitoringInfo() {}
-
- MonitoringInfo(const ExecutorInfo& _executorInfo,
- const Duration& window,
- size_t capacity)
- : executorInfo(_executorInfo), statistics(window, capacity) {}
-
- ExecutorInfo executorInfo; // Non-const for assignability.
- process::TimeSeries<ResourceStatistics> statistics;
- };
-
- // The monitoring info is stored for each monitored container.
- hashmap<ContainerID, MonitoringInfo> monitored;
-
- // Fixed-size history of monitoring information.
- boost::circular_buffer<process::Owned<MonitoringInfo>> archive;
+ // The executor info is stored for each monitored container.
+ hashmap<ContainerID, ExecutorInfo> monitored;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bd76a40..39967cd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2403,8 +2403,7 @@ void Slave::reregisterExecutor(
// update fails.
monitor.start(
executor->containerId,
- executor->info,
- flags.resource_monitoring_interval)
+ executor->info)
.onAny(lambda::bind(_monitor,
lambda::_1,
framework->id(),
@@ -3180,8 +3179,7 @@ void Slave::executorLaunched(
// Start monitoring the container's resources.
monitor.start(
containerId,
- executor->info,
- flags.resource_monitoring_interval)
+ executor->info)
.onAny(lambda::bind(_monitor,
lambda::_1,
frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 899af44..ca3b7f4 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -58,115 +58,6 @@ namespace internal {
namespace tests {
-TEST(MonitorTest, Collection)
-{
- FrameworkID frameworkId;
- frameworkId.set_value("framework");
-
- ExecutorID executorId;
- executorId.set_value("executor");
-
- ContainerID containerId;
- containerId.set_value("container");
-
- ExecutorInfo executorInfo;
- executorInfo.mutable_executor_id()->CopyFrom(executorId);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.set_name("name");
- executorInfo.set_source("source");
-
- ResourceStatistics statistics1;
- statistics1.set_cpus_nr_periods(100);
- statistics1.set_cpus_nr_throttled(2);
- statistics1.set_cpus_user_time_secs(4);
- statistics1.set_cpus_system_time_secs(1);
- statistics1.set_cpus_throttled_time_secs(0.5);
- statistics1.set_cpus_limit(1.0);
- statistics1.set_mem_rss_bytes(1024);
- statistics1.set_mem_file_bytes(0);
- statistics1.set_mem_anon_bytes(0);
- statistics1.set_mem_mapped_file_bytes(0);
- statistics1.set_mem_limit_bytes(2048);
- statistics1.set_timestamp(0);
-
- ResourceStatistics statistics2;
- statistics2.CopyFrom(statistics1);
- statistics2.set_timestamp(
- statistics2.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
-
- ResourceStatistics statistics3;
- statistics3.CopyFrom(statistics2);
- statistics3.set_timestamp(
- statistics3.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
-
- TestContainerizer containerizer;
-
- Future<Nothing> usage1, usage2, usage3;
- EXPECT_CALL(containerizer, usage(containerId))
- .WillOnce(DoAll(FutureSatisfy(&usage1),
- Return(statistics1)))
- .WillOnce(DoAll(FutureSatisfy(&usage2),
- Return(statistics2)))
- .WillOnce(DoAll(FutureSatisfy(&usage3),
- Return(statistics3)));
-
- slave::ResourceMonitor monitor(&containerizer);
-
- // We pause the clock first in order to make sure that we can
- // advance time below to force the 'delay' in
- // ResourceMonitorProcess::start to execute.
- process::Clock::pause();
-
- monitor.start(
- containerId,
- executorInfo,
- slave::RESOURCE_MONITORING_INTERVAL);
-
- // Now wait for ResouorceMonitorProcess::start to finish so we can
- // advance time to cause collection to begin.
- process::Clock::settle();
-
- process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
- process::Clock::settle();
-
- AWAIT_READY(usage1);
-
- // Wait until the containerizer has finished returning the statistics.
- process::Clock::settle();
-
- // Expect a second collection to occur after the interval.
- process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
- process::Clock::settle();
-
- AWAIT_READY(usage2);
-
- // Wait until the containerizer has finished returning the statistics.
- process::Clock::settle();
-
- // Expect a third collection to occur after the interval.
- process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
- process::Clock::settle();
-
- AWAIT_READY(usage3);
-
- // Wait until the containerize has finished returning the statistics.
- process::Clock::settle();
-
- // Ensure the monitor stops polling the isolator.
- monitor.stop(containerId);
-
- // Wait until ResourceMonitorProcess::stop has completed.
- process::Clock::settle();
-
- // This time, Containerizer::usage should not get called.
- EXPECT_CALL(containerizer, usage(containerId))
- .Times(0);
-
- process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
- process::Clock::settle();
-}
-
-
TEST(MonitorTest, Statistics)
{
FrameworkID frameworkId;
@@ -213,8 +104,7 @@ TEST(MonitorTest, Statistics)
monitor.start(
containerId,
- executorInfo,
- slave::RESOURCE_MONITORING_INTERVAL);
+ executorInfo);
// Now wait for ResouorceMonitorProcess::watch to finish.
process::Clock::settle();