You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/05/14 01:43:58 UTC

[2/2] mesos git commit: Removed unused collect cycle in resource monitor.

Removed unused collect cycle in resource monitor.

Review: https://reviews.apache.org/r/33875


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/791b7247
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/791b7247
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/791b7247

Branch: refs/heads/master
Commit: 791b7247f3f973f87575bede0c9ffdafa1ed4967
Parents: 2c6d487
Author: Niklas Nielsen <ni...@qni.dk>
Authored: Wed May 13 16:07:30 2015 -0700
Committer: Niklas Q. Nielsen <ni...@qni.dk>
Committed: Wed May 13 16:42:26 2015 -0700

----------------------------------------------------------------------
 src/slave/flags.hpp         |   3 ++
 src/slave/monitor.cpp       |  98 +++-------------------------------
 src/slave/monitor.hpp       |  57 +++-----------------
 src/slave/slave.cpp         |   6 +--
 src/tests/monitor_tests.cpp | 112 +--------------------------------------
 5 files changed, 19 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index d3b1ce1..4c50be3 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -55,6 +55,9 @@ public:
   Duration gc_delay;
   double gc_disk_headroom;
   Duration disk_watch_interval;
+
+  // TODO(nnielsen): Deprecate resource_monitoring_interval flag after
+  // Mesos 0.23.0.
   Duration resource_monitoring_interval;
 
   std::string recover;

http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 398af01..ba31206 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -53,29 +53,15 @@ namespace slave {
 using process::wait; // Necessary on some OS's to disambiguate.
 
 
-// TODO(bmahler): Consider exposing these as flags should the
-// need arise. These are conservative for the initial version.
-const Duration MONITORING_TIME_SERIES_WINDOW = Weeks(2);
-const size_t MONITORING_TIME_SERIES_CAPACITY = 1000;
-const size_t MONITORING_ARCHIVED_TIME_SERIES = 25;
-
-
 Future<Nothing> ResourceMonitorProcess::start(
     const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const Duration& interval)
+    const ExecutorInfo& executorInfo)
 {
   if (monitored.contains(containerId)) {
     return Failure("Already monitored");
   }
 
-  monitored[containerId] =
-      MonitoringInfo(executorInfo,
-                     MONITORING_TIME_SERIES_WINDOW,
-                     MONITORING_TIME_SERIES_CAPACITY);
-
-  // Schedule the resource collection.
-  delay(interval, self(), &Self::collect, containerId, interval);
+  monitored[containerId] = executorInfo;
 
   return Nothing();
 }
@@ -88,82 +74,12 @@ Future<Nothing> ResourceMonitorProcess::stop(
     return Failure("Not monitored");
   }
 
-  // Add the monitoring information to the archive.
-  archive.push_back(
-      process::Owned<MonitoringInfo>(
-          new MonitoringInfo(monitored[containerId])));
   monitored.erase(containerId);
 
   return Nothing();
 }
 
 
-void ResourceMonitorProcess::collect(
-    const ContainerID& containerId,
-    const Duration& interval)
-{
-  // Has monitoring stopped?
-  if (!monitored.contains(containerId)) {
-    return;
-  }
-
-  containerizer->usage(containerId)
-    .onAny(defer(self(),
-                 &Self::_collect,
-                 lambda::_1,
-                 containerId,
-                 interval));
-}
-
-
-void ResourceMonitorProcess::_collect(
-    const Future<ResourceStatistics>& statistics,
-    const ContainerID& containerId,
-    const Duration& interval)
-{
-  // Has monitoring been stopped?
-  if (!monitored.contains(containerId)) {
-    return;
-  }
-
-  const ExecutorID& executorId =
-    monitored[containerId].executorInfo.executor_id();
-  const FrameworkID& frameworkId =
-    monitored[containerId].executorInfo.framework_id();
-
-  if (statistics.isDiscarded()) {
-    VLOG(1) << "Ignoring discarded future collecting resource usage for"
-            << " container '" << containerId
-            << "' for executor '" << executorId
-            << "' of framework '" << frameworkId << "'";
-  } else if (statistics.isFailed()) {
-    // TODO(bmahler): Have the Containerizer discard the result when the
-    // executor was killed or completed.
-    VLOG(1)
-      << "Failed to collect resource usage for"
-      << " container '" << containerId
-      << "' for executor '" << executorId
-      << "' of framework '" << frameworkId << "': " << statistics.failure();
-  } else {
-    Try<Time> time = Time::create(statistics.get().timestamp());
-
-    if (time.isError()) {
-      LOG(ERROR) << "Invalid timestamp " << statistics.get().timestamp()
-                 << " for container '" << containerId
-                 << "' for executor '" << executorId
-                 << "' of framework '" << frameworkId << ": " << time.error();
-    } else {
-      // Add the statistics to the time series.
-      monitored[containerId].statistics.set(
-          statistics.get(), time.get());
-    }
-  }
-
-  // Schedule the next collection.
-  delay(interval, self(), &Self::collect, containerId, interval);
-}
-
-
 ResourceMonitorProcess::Usage ResourceMonitorProcess::usage(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo)
@@ -192,10 +108,10 @@ Future<http::Response> ResourceMonitorProcess::_statistics(
   list<Future<ResourceStatistics> > futures;
 
   foreachpair (const ContainerID& containerId,
-               const MonitoringInfo& info,
+               const ExecutorInfo& info,
                monitored) {
     // TODO(bmahler): Consider a batch usage API on the Containerizer.
-    usages.push_back(usage(containerId, info.executorInfo));
+    usages.push_back(usage(containerId, info));
     futures.push_back(usages.back().statistics);
   }
 
@@ -289,15 +205,13 @@ ResourceMonitor::~ResourceMonitor()
 
 Future<Nothing> ResourceMonitor::start(
     const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const Duration& interval)
+    const ExecutorInfo& executorInfo)
 {
   return dispatch(
       process,
       &ResourceMonitorProcess::start,
       containerId,
-      executorInfo,
-      interval);
+      executorInfo);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 69c60a1..0d09274 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -51,21 +51,10 @@ class ResourceMonitorProcess;
 const extern Duration MONITORING_TIME_SERIES_WINDOW;
 const extern size_t MONITORING_TIME_SERIES_CAPACITY;
 
-// Number of time series to maintain for completed executors.
-const extern size_t MONITORING_ARCHIVED_TIME_SERIES;
 
-
-// Provides resource monitoring for containers. Resource usage time
-// series are stored using the Statistics module. Usage information
-// is also exported via a JSON endpoint.
+// Provides resource monitoring for containers. Usage information is
+// also exported via a JSON endpoint.
 // TODO(bmahler): Forward usage information to the master.
-// TODO(bmahler): Consider pulling out the resource collection into
-// a Collector abstraction. The monitor can then become a true
-// monitoring abstraction, allowing isolators to subscribe
-// to resource usage events. (e.g. get a future for the executor
-// hitting 75% memory consumption, the future would become ready
-// when this occurs, and the isolator can discard the future
-// when no longer interested).
 class ResourceMonitor
 {
 public:
@@ -76,8 +65,7 @@ public:
   // Returns a failure if the container is already being watched.
   process::Future<Nothing> start(
       const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const Duration& interval);
+      const ExecutorInfo& executorInfo);
 
   // Stops monitoring resources for the given container.
   // Returns a failure if the container is unknown to the monitor.
@@ -95,15 +83,13 @@ public:
   explicit ResourceMonitorProcess(Containerizer* _containerizer)
     : ProcessBase("monitor"),
       containerizer(_containerizer),
-      limiter(2, Seconds(1)), // 2 permits per second.
-      archive(MONITORING_ARCHIVED_TIME_SERIES) {}
+      limiter(2, Seconds(1)) {} // 2 permits per second.
 
   virtual ~ResourceMonitorProcess() {}
 
   process::Future<Nothing> start(
       const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const Duration& interval);
+      const ExecutorInfo& executorInfo);
 
   process::Future<Nothing> stop(
       const ContainerID& containerId);
@@ -114,21 +100,9 @@ protected:
     route("/statistics.json",
           STATISTICS_HELP,
           &ResourceMonitorProcess::statistics);
-
-    // TODO(bmahler): Add a archive.json endpoint that exposes
-    // historical information, once we have path parameters for
-    // routes.
   }
 
 private:
-  void collect(
-      const ContainerID& containerId,
-      const Duration& interval);
-  void _collect(
-      const process::Future<ResourceStatistics>& statistics,
-      const ContainerID& containerId,
-      const Duration& interval);
-
   // This is a convenience struct for bundling usage information.
   struct Usage
   {
@@ -159,25 +133,8 @@ private:
   // Used to rate limit the statistics.json endpoint.
   process::RateLimiter limiter;
 
-  // Monitoring information for an executor.
-  struct MonitoringInfo {
-    // boost::circular_buffer needs a default constructor.
-    MonitoringInfo() {}
-
-    MonitoringInfo(const ExecutorInfo& _executorInfo,
-                   const Duration& window,
-                   size_t capacity)
-      : executorInfo(_executorInfo), statistics(window, capacity) {}
-
-    ExecutorInfo executorInfo;   // Non-const for assignability.
-    process::TimeSeries<ResourceStatistics> statistics;
-  };
-
-  // The monitoring info is stored for each monitored container.
-  hashmap<ContainerID, MonitoringInfo> monitored;
-
-  // Fixed-size history of monitoring information.
-  boost::circular_buffer<process::Owned<MonitoringInfo>> archive;
+  // The executor info is stored for each monitored container.
+  hashmap<ContainerID, ExecutorInfo> monitored;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bd76a40..39967cd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2403,8 +2403,7 @@ void Slave::reregisterExecutor(
       // update fails.
       monitor.start(
           executor->containerId,
-          executor->info,
-          flags.resource_monitoring_interval)
+          executor->info)
         .onAny(lambda::bind(_monitor,
                             lambda::_1,
                             framework->id(),
@@ -3180,8 +3179,7 @@ void Slave::executorLaunched(
       // Start monitoring the container's resources.
       monitor.start(
           containerId,
-          executor->info,
-          flags.resource_monitoring_interval)
+          executor->info)
         .onAny(lambda::bind(_monitor,
                             lambda::_1,
                             frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/791b7247/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 899af44..ca3b7f4 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -58,115 +58,6 @@ namespace internal {
 namespace tests {
 
 
-TEST(MonitorTest, Collection)
-{
-  FrameworkID frameworkId;
-  frameworkId.set_value("framework");
-
-  ExecutorID executorId;
-  executorId.set_value("executor");
-
-  ContainerID containerId;
-  containerId.set_value("container");
-
-  ExecutorInfo executorInfo;
-  executorInfo.mutable_executor_id()->CopyFrom(executorId);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.set_name("name");
-  executorInfo.set_source("source");
-
-  ResourceStatistics statistics1;
-  statistics1.set_cpus_nr_periods(100);
-  statistics1.set_cpus_nr_throttled(2);
-  statistics1.set_cpus_user_time_secs(4);
-  statistics1.set_cpus_system_time_secs(1);
-  statistics1.set_cpus_throttled_time_secs(0.5);
-  statistics1.set_cpus_limit(1.0);
-  statistics1.set_mem_rss_bytes(1024);
-  statistics1.set_mem_file_bytes(0);
-  statistics1.set_mem_anon_bytes(0);
-  statistics1.set_mem_mapped_file_bytes(0);
-  statistics1.set_mem_limit_bytes(2048);
-  statistics1.set_timestamp(0);
-
-  ResourceStatistics statistics2;
-  statistics2.CopyFrom(statistics1);
-  statistics2.set_timestamp(
-      statistics2.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
-
-  ResourceStatistics statistics3;
-  statistics3.CopyFrom(statistics2);
-  statistics3.set_timestamp(
-      statistics3.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
-
-  TestContainerizer containerizer;
-
-  Future<Nothing> usage1, usage2, usage3;
-  EXPECT_CALL(containerizer, usage(containerId))
-    .WillOnce(DoAll(FutureSatisfy(&usage1),
-                    Return(statistics1)))
-    .WillOnce(DoAll(FutureSatisfy(&usage2),
-                    Return(statistics2)))
-    .WillOnce(DoAll(FutureSatisfy(&usage3),
-                    Return(statistics3)));
-
-  slave::ResourceMonitor monitor(&containerizer);
-
-  // We pause the clock first in order to make sure that we can
-  // advance time below to force the 'delay' in
-  // ResourceMonitorProcess::start to execute.
-  process::Clock::pause();
-
-  monitor.start(
-      containerId,
-      executorInfo,
-      slave::RESOURCE_MONITORING_INTERVAL);
-
-  // Now wait for ResouorceMonitorProcess::start to finish so we can
-  // advance time to cause collection to begin.
-  process::Clock::settle();
-
-  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
-  process::Clock::settle();
-
-  AWAIT_READY(usage1);
-
-  // Wait until the containerizer has finished returning the statistics.
-  process::Clock::settle();
-
-  // Expect a second collection to occur after the interval.
-  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
-  process::Clock::settle();
-
-  AWAIT_READY(usage2);
-
-  // Wait until the containerizer has finished returning the statistics.
-  process::Clock::settle();
-
-  // Expect a third collection to occur after the interval.
-  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
-  process::Clock::settle();
-
-  AWAIT_READY(usage3);
-
-  // Wait until the containerize has finished returning the statistics.
-  process::Clock::settle();
-
-  // Ensure the monitor stops polling the isolator.
-  monitor.stop(containerId);
-
-  // Wait until ResourceMonitorProcess::stop has completed.
-  process::Clock::settle();
-
-  // This time, Containerizer::usage should not get called.
-  EXPECT_CALL(containerizer, usage(containerId))
-    .Times(0);
-
-  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
-  process::Clock::settle();
-}
-
-
 TEST(MonitorTest, Statistics)
 {
   FrameworkID frameworkId;
@@ -213,8 +104,7 @@ TEST(MonitorTest, Statistics)
 
   monitor.start(
       containerId,
-      executorInfo,
-      slave::RESOURCE_MONITORING_INTERVAL);
+      executorInfo);
 
   // Now wait for ResouorceMonitorProcess::watch to finish.
   process::Clock::settle();