You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/31 17:39:29 UTC

svn commit: r1404240 - in /incubator/mesos/trunk/src/slave: cgroups_isolation_module.cpp cgroups_isolation_module.hpp

Author: benh
Date: Wed Oct 31 16:39:28 2012
New Revision: 1404240

URL: http://svn.apache.org/viewvc?rev=1404240&view=rev
Log:
Updated the cgroups isolation module to use
'memory.soft_limit_in_bytes' when decreasing the memory limit. Also
made some small simplifications to cleanup the code base (some of
which are from https://reviews.apache.org/r/6534, which is discarded
for now).

Modified:
    incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
    incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp

Modified: incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp?rev=1404240&r1=1404239&r2=1404240&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp Wed Oct 31 16:39:28 2012
@@ -32,6 +32,7 @@
 #include <stout/fatal.hpp>
 #include <stout/foreach.hpp>
 #include <stout/lambda.hpp>
+#include <stout/numify.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
@@ -257,13 +258,15 @@ void CgroupsIsolationModule::initialize(
   CHECK(disable.isSome())
     << "Failed to disable OOM killer: " << disable.error();
 
-  // Configure resource subsystem mapping.
-  resourceSubsystemMap["cpus"] = "cpu";
-  resourceSubsystemMap["mem"] = "memory";
-
-  // Configure resource changed handlers.
-  resourceChangedHandlers["cpus"] = &CgroupsIsolationModule::cpusChanged;
-  resourceChangedHandlers["mem"] = &CgroupsIsolationModule::memChanged;
+  // Configure resource changed handlers. We only add handlers for
+  // resources that have the appropriate subsystem activated.
+  if (activatedSubsystems.contains("cpu")) {
+    handlers["cpus"] = &CgroupsIsolationModule::cpusChanged;
+  }
+
+  if (activatedSubsystems.contains("memory")) {
+    handlers["mem"] = &CgroupsIsolationModule::memChanged;
+  }
 
   initialized = true;
 }
@@ -347,8 +350,6 @@ void CgroupsIsolationModule::launchExecu
     LOG(INFO) << "Forked executor at = " << pid;
 
     // Store the pid of the leading process of the executor.
-    CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
-    CHECK(info != NULL) << "Cannot find cgroup info";
     info->pid = pid;
 
     // Tell the slave this executor has started.
@@ -428,21 +429,11 @@ void CgroupsIsolationModule::resourcesCh
             << " with resources " << resources;
 
   // For each resource, invoke the corresponding handler.
-  for (Resources::const_iterator it = resources.begin();
-       it != resources.end(); ++it) {
-    const Resource& resource = *it;
-    const string& name = resource.name();
-
-    if (resourceChangedHandlers.contains(name)) {
-      // We only call the resource changed handler either if the resource does
-      // not depend on any subsystem, or the dependent subsystem is active.
-      if (!resourceSubsystemMap.contains(name) ||
-          activatedSubsystems.contains(resourceSubsystemMap[name])) {
-        Try<Nothing> result =
-          (this->*resourceChangedHandlers[name])(info, resources);
-        if (result.isError()) {
-          LOG(ERROR) << result.error();
-        }
+  foreach (const Resource& resource, resources) {
+    if (handlers.contains(resource.name())) {
+      Try<Nothing> result = (this->*handlers[resource.name()])(info, resource);
+      if (result.isError()) {
+        LOG(ERROR) << result.error();
       }
     }
   }
@@ -476,70 +467,89 @@ void CgroupsIsolationModule::processExit
 
 Try<Nothing> CgroupsIsolationModule::cpusChanged(
     const CgroupInfo* info,
-    const Resources& resources)
+    const Resource& resource)
 {
-  Resource r;
-  r.set_name("cpus");
-  r.set_type(Value::SCALAR);
-
-  Option<Resource> cpusResource = resources.get(r);
-  if (cpusResource.isNone()) {
-    LOG(WARNING) << "Resource cpus cannot be retrieved for executor "
-                 << info->executorId << " of framework " << info->frameworkId;
-  } else {
-    double cpus = cpusResource.get().scalar().value();
-    size_t cpuShares =
-      std::max((size_t)(CPU_SHARES_PER_CPU * cpus), MIN_CPU_SHARES);
-
-    Try<Nothing> set =
-      cgroups::writeControl(hierarchy,
-                            info->name(),
-                            "cpu.shares",
-                            stringify(cpuShares));
-    if (set.isError()) {
-      return set;
-    }
-
-    LOG(INFO) << "Write cpu.shares = " << cpuShares
-              << " for executor " << info->executorId
-              << " of framework " << info->frameworkId;
+  CHECK(resource.name() == "cpus");
+
+  if (resource.type() != Value::SCALAR) {
+    return Try<Nothing>::error("Expecting resource 'cpus' to be a scalar");
   }
 
+  double cpus = resource.scalar().value();
+  size_t cpuShares =
+    std::max((size_t)(CPU_SHARES_PER_CPU * cpus), MIN_CPU_SHARES);
+
+  Try<Nothing> write = cgroups::writeControl(
+      hierarchy, info->name(), "cpu.shares", stringify(cpuShares));
+
+  if (write.isError()) {
+    return Try<Nothing>::error(
+        "Failed to update 'cpu.shares': " + write.error());
+  }
+
+  LOG(INFO) << "Updated 'cpu.shares' to " << cpuShares
+            << " for executor " << info->executorId
+            << " of framework " << info->frameworkId;
+
   return Nothing();
 }
 
 
 Try<Nothing> CgroupsIsolationModule::memChanged(
     const CgroupInfo* info,
-    const Resources& resources)
+    const Resource& resource)
 {
-  Resource r;
-  r.set_name("mem");
-  r.set_type(Value::SCALAR);
-
-  Option<Resource> memResource = resources.get(r);
-  if (memResource.isNone()) {
-    LOG(WARNING) << "Resource mem cannot be retrieved for executor "
-                 << info->executorId << " of framework " << info->frameworkId;
-  } else {
-    double mem = memResource.get().scalar().value();
-    size_t limitInBytes =
-      std::max((size_t)mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
-
-    Try<Nothing> set =
-      cgroups::writeControl(hierarchy,
-                            info->name(),
-                            "memory.limit_in_bytes",
-                            stringify(limitInBytes));
-    if (set.isError()) {
-      return set;
-    }
-
-    LOG(INFO) << "Write memory.limit_in_bytes = " << limitInBytes
-              << " for executor " << info->executorId
-              << " of framework " << info->frameworkId;
+  CHECK(resource.name() == "mem");
+
+  if (resource.type() != Value::SCALAR) {
+    return Try<Nothing>::error("Expecting resource 'mem' to be a scalar");
   }
 
+  double mem = resource.scalar().value();
+  size_t limitInBytes =
+    std::max((size_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
+
+  // Determine which control to set. If this is the first time we're
+  // setting the limit, use 'memory.limit_in_bytes'. The "first time"
+  // is determined by checking whether or not we've forked a process
+  // in the cgroup yet (i.e., 'info->pid != -1'). If this is not the
+  // first time we're setting the limit AND we're decreasing the
+  // limit, use 'memory.soft_limit_in_bytes'. We do this because we
+  // might not be able to decrease 'memory.limit_in_bytes' if too much
+  // memory is being used. This is probably okay if the machine has
+  // available resources; TODO(benh): Introduce a MemoryWatcherProcess
+  // which monitors the descrepancy between usage and soft limit and
+  // introduces a "manual oom" if necessary.
+  string control = "memory.limit_in_bytes";
+
+  if (info->pid != -1) {
+    Try<string> read = cgroups::readControl(
+        hierarchy, info->name(), "memory.limit_in_bytes");
+    if (read.isError()) {
+      return Try<Nothing>::error(
+          "Failed to read 'memory.limit_in_bytes': " + read.error());
+    }
+
+    Try<size_t> currentLimitInBytes = numify<size_t>(strings::trim(read.get()));
+    CHECK(currentLimitInBytes.isSome()) << currentLimitInBytes.error();
+
+    if (limitInBytes <= currentLimitInBytes.get()) {
+      control = "memory.soft_limit_in_bytes";
+    }
+  }
+
+  Try<Nothing> write = cgroups::writeControl(
+      hierarchy, info->name(), control, stringify(limitInBytes));
+
+  if (write.isError()) {
+    return Try<Nothing>::error(
+        "Failed to update '" + control + "': " + write.error());
+  }
+
+  LOG(INFO) << "Updated '" << control << "' to " << limitInBytes
+            << " for executor " << info->executorId
+            << " of framework " << info->frameworkId;
+
   return Nothing();
 }
 
@@ -636,6 +646,12 @@ void CgroupsIsolationModule::oom(
     LOG(INFO) << "MEMORY LIMIT: " << strings::trim(read.get()) << " bytes";
   }
 
+  // Output 'memory.usage_in_bytes'.
+  read = cgroups::readControl(hierarchy, info->name(), "memory.usage_in_bytes");
+  if (read.isSome()) {
+    LOG(INFO) << "MEMORY USAGE: " << strings::trim(read.get()) << " bytes";
+  }
+
   // Output 'memory.stat' of the cgroup to help with debugging.
   read = cgroups::readControl(hierarchy, info->name(), "memory.stat");
   if (read.isSome()) {

Modified: incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp?rev=1404240&r1=1404239&r2=1404240&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp Wed Oct 31 16:39:28 2012
@@ -98,7 +98,7 @@ private:
     // executor (which have the same frameworkId and executorId).
     std::string tag;
 
-    // PID of the leading process of the executor.
+    // PID of the forked process of the executor.
     pid_t pid;
 
     // Whether the executor has been killed.
@@ -115,7 +115,7 @@ private:
   // @return  Whether the operation successes.
   Try<Nothing> cpusChanged(
       const CgroupInfo* info,
-      const Resources& resources);
+      const Resource& resource);
 
   // The callback which will be invoked when "mem" resource has changed.
   // @param   frameworkId   The id of the given framework.
@@ -124,7 +124,7 @@ private:
   // @return  Whether the operation successes.
   Try<Nothing> memChanged(
       const CgroupInfo* info,
-      const Resources& resources);
+      const Resource& resource);
 
   // Start listening on OOM events. This function will create an eventfd and
   // start polling on it.
@@ -203,15 +203,11 @@ private:
   // The activated cgroups subsystems that can be used by the module.
   hashset<std::string> activatedSubsystems;
 
-  // The mapping between resource name and corresponding cgroups subsystem.
-  hashmap<std::string, std::string> resourceSubsystemMap;
-
-  // Mapping between resource name to the corresponding resource changed
-  // handler function.
+  // Handlers for each resource name, used for resource changes.
   hashmap<std::string,
           Try<Nothing>(CgroupsIsolationModule::*)(
               const CgroupInfo*,
-              const Resources&)> resourceChangedHandlers;
+              const Resource&)> handlers;
 };
 
 } // namespace mesos {