You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 02:28:52 UTC

svn commit: r1370080 - in /incubator/mesos/trunk/src: ./ examples/ slave/ tests/ tests/external/CgroupsIsolation/

Author: benh
Date: Tue Aug  7 00:28:51 2012
New Revision: 1370080

URL: http://svn.apache.org/viewvc?rev=1370080&view=rev
Log:
Added the cgroups isolation module (contributed by Jie Yu,
https://reviews.apache.org/r/5509).

Added:
    incubator/mesos/trunk/src/examples/balloon_executor.cpp
    incubator/mesos/trunk/src/examples/balloon_framework.cpp
    incubator/mesos/trunk/src/examples/utils.hpp
      - copied, changed from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp
    incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
    incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp
    incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp
      - copied, changed from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp
    incubator/mesos/trunk/src/tests/external/CgroupsIsolation/
    incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/slave/flags.hpp
    incubator/mesos/trunk/src/slave/isolation_module.cpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Tue Aug  7 00:28:51 2012
@@ -170,11 +170,13 @@ pkginclude_HEADERS = $(top_srcdir)/inclu
 nodist_pkginclude_HEADERS = ../include/mesos/mesos.hpp mesos.pb.h
 
 if OS_LINUX
+  libmesos_no_third_party_la_SOURCES += slave/cgroups_isolation_module.cpp
   libmesos_no_third_party_la_SOURCES += slave/lxc_isolation_module.cpp
   libmesos_no_third_party_la_SOURCES += linux/cgroups.cpp
   libmesos_no_third_party_la_SOURCES += linux/fs.cpp
   libmesos_no_third_party_la_SOURCES += linux/proc.cpp
 else
+  EXTRA_DIST += slave/cgroups_isolation_module.cpp
   EXTRA_DIST += slave/lxc_isolation_module.cpp
   EXTRA_DIST += linux/cgroups.cpp
   EXTRA_DIST += linux/fs.cpp
@@ -200,6 +202,7 @@ libmesos_no_third_party_la_SOURCES += co
 	messages/messages.hpp slave/constants.hpp slave/flags.hpp	\
 	slave/http.hpp slave/isolation_module.hpp			\
 	slave/isolation_module_factory.hpp				\
+	slave/cgroups_isolation_module.hpp				\
 	slave/lxc_isolation_module.hpp					\
 	slave/process_based_isolation_module.hpp slave/reaper.hpp	\
 	slave/slave.hpp slave/solaris_project_isolation_module.hpp	\
@@ -745,6 +748,16 @@ no_executor_framework_SOURCES = examples
 no_executor_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
 no_executor_framework_LDADD = libmesos.la
 
+check_PROGRAMS += balloon-framework
+balloon_framework_SOURCES = examples/balloon_framework.cpp
+balloon_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+balloon_framework_LDADD = libmesos.la
+
+check_PROGRAMS += balloon-executor
+balloon_executor_SOURCES = examples/balloon_executor.cpp
+balloon_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+balloon_executor_LDADD = libmesos.la
+
 check_PROGRAMS += mesos-tests
 
 mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp			\
@@ -779,6 +792,7 @@ mesos_tests_LDADD = ../third_party/libgm
 mesos_tests_DEPENDENCIES = # Initialized to allow += below.
 
 if OS_LINUX
+  mesos_tests_SOURCES += tests/cgroups_isolation_tests.cpp
   mesos_tests_SOURCES += tests/cgroups_tests.cpp
   mesos_tests_SOURCES += tests/fs_tests.cpp
   mesos_tests_SOURCES += tests/proc_tests.cpp

Added: incubator/mesos/trunk/src/examples/balloon_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/balloon_executor.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/examples/balloon_executor.cpp (added)
+++ incubator/mesos/trunk/src/examples/balloon_executor.cpp Tue Aug  7 00:28:51 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <string>
+
+#include <mesos/executor.hpp>
+
+#include <stout/numify.hpp>
+
+using namespace mesos;
+
+
+// This function will increase the memory footprint gradually. The parameter
+// limit specifies the upper limit (in MB) of the memory footprint. The
+// parameter step specifies the step size (in MB).
+static void balloon(size_t limit, size_t step)
+{
+  size_t chunk = step * 1024 * 1024;
+  for (size_t i = 0; i < limit / step; i++) {
+    std::cout << "Increasing memory footprint by "
+              << step << " MB" << std::endl;
+
+    // Allocate virtual memory.
+    char* buffer = (char *)malloc(chunk);
+
+    // We use memset here so that the memory actually gets paged in. However,
+    // the memory may get paged out again depending on the OS page replacement
+    // algorithm. Therefore, to ensure X MB of memory is actually used, we need
+    // to pass Y (Y > X) to this function.
+    ::memset(buffer, 1, chunk);
+
+    // Try not to increase the memory footprint too fast.
+    ::sleep(1);
+  }
+}
+
+
+class BalloonExecutor : public Executor
+{
+public:
+  virtual ~BalloonExecutor() {}
+
+  virtual void registered(ExecutorDriver* driver,
+                          const ExecutorInfo& executorInfo,
+                          const FrameworkInfo& frameworkInfo,
+                          const SlaveInfo& slaveInfo)
+  {
+    // Setup balloon step.
+    Try<size_t> step = numify<size_t>(executorInfo.data());
+    assert(step.isSome());
+    balloonStep = step.get();
+
+    std::cout << "Registered" << std::endl;
+  }
+
+  virtual void reregistered(ExecutorDriver* driver,
+                            const SlaveInfo& slaveInfo)
+  {
+    std::cout << "Reregistered" << std::endl;
+  }
+
+  virtual void disconnected(ExecutorDriver* driver)
+  {
+    std::cout << "Disconnected" << std::endl;
+  }
+
+  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    std::cout << "Starting task " << task.task_id().value() << std::endl;
+
+    TaskStatus status;
+    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.set_state(TASK_RUNNING);
+
+    driver->sendStatusUpdate(status);
+
+    // Get the balloon limit (in MB).
+    Try<size_t> limit = numify<size_t>(task.data());
+    assert(limit.isSome());
+    size_t balloonLimit = limit.get();
+
+    // Artificially increase the memory usage gradually. The balloonLimit
+    // specifies the upper limit. The balloonLimit can be larger than the amount
+    // of memory allocated to this executor. In that case, the isolation module
+    // (e.g. cgroups) should be able to detect that and the task should not be
+    // able to reach TASK_FINISHED state.
+    balloon(balloonLimit, balloonStep);
+
+    std::cout << "Finishing task " << task.task_id().value() << std::endl;
+
+    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.set_state(TASK_FINISHED);
+
+    driver->sendStatusUpdate(status);
+  }
+
+  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    std::cout << "Kill task " << taskId.value() << std::endl;
+  }
+
+  virtual void frameworkMessage(ExecutorDriver* driver, const std::string& data)
+  {
+    std::cout << "Framework message: " << data << std::endl;
+  }
+
+  virtual void shutdown(ExecutorDriver* driver)
+  {
+    std::cout << "Shutdown" << std::endl;
+  }
+
+  virtual void error(ExecutorDriver* driver, const std::string& message)
+  {
+    std::cout << "Error message: " << message << std::endl;
+  }
+
+private:
+  // The amount of memory in MB each balloon step consumes.
+  size_t balloonStep;
+};
+
+
+int main(int argc, char** argv)
+{
+  BalloonExecutor executor;
+  MesosExecutorDriver driver(&executor);
+  return driver.run() == DRIVER_STOPPED ? 0 : 1;
+}

Added: incubator/mesos/trunk/src/examples/balloon_framework.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/balloon_framework.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/examples/balloon_framework.cpp (added)
+++ incubator/mesos/trunk/src/examples/balloon_framework.cpp Tue Aug  7 00:28:51 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <libgen.h>
+#include <stdlib.h>
+
+#include <sys/param.h>
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <mesos/scheduler.hpp>
+
+#include <stout/numify.hpp>
+#include <stout/stringify.hpp>
+
+#include "examples/utils.hpp"
+
+using namespace mesos;
+
+
+// The amount of memory in MB the executor itself takes.
+const static size_t EXECUTOR_MEMORY_MB = 64;
+
+// The amount of memory in MB each balloon step consumes.
+const static size_t BALLOON_STEP_MB = 64;
+
+
+class BalloonScheduler : public Scheduler
+{
+public:
+  BalloonScheduler(const ExecutorInfo& _executor,
+                   size_t _balloonLimit)
+    : executor(_executor),
+      balloonLimit(_balloonLimit),
+      taskLaunched(false) {}
+
+  virtual ~BalloonScheduler() {}
+
+  virtual void registered(SchedulerDriver*,
+                          const FrameworkID&,
+                          const MasterInfo&)
+  {
+    std::cout << "Registered" << std::endl;
+  }
+
+  virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
+  {
+    std::cout << "Reregistered" << std::endl;
+  }
+
+  virtual void disconnected(SchedulerDriver* driver)
+  {
+    std::cout << "Disconnected" << std::endl;
+  }
+
+  virtual void resourceOffers(SchedulerDriver* driver,
+                              const std::vector<Offer>& offers)
+  {
+    std::cout << "Resource offers received" << std::endl;
+
+    for (int i = 0; i < offers.size(); i++) {
+      const Offer& offer = offers[i];
+
+      // We just launch one task.
+      if (!taskLaunched) {
+        double mem = getScalarResource(offer, "mem");
+        assert(mem > EXECUTOR_MEMORY_MB);
+
+        std::vector<TaskInfo> tasks;
+        std::cout << "Starting the task" << std::endl;
+
+        TaskInfo task;
+        task.set_name("Balloon Task");
+        task.mutable_task_id()->set_value("1");
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
+        task.mutable_executor()->MergeFrom(executor);
+        task.set_data(stringify<size_t>(balloonLimit));
+
+        // Use up all the memory from the offer.
+        Resource* resource;
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(Value::SCALAR);
+        resource->mutable_scalar()->set_value(mem - EXECUTOR_MEMORY_MB);
+
+        tasks.push_back(task);
+        driver->launchTasks(offer.id(), tasks);
+
+        taskLaunched = true;
+      }
+    }
+  }
+
+  virtual void offerRescinded(SchedulerDriver* driver,
+                              const OfferID& offerId)
+  {
+    std::cout << "Offer rescinded" << std::endl;
+  }
+
+  virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+  {
+    std::cout << "Task in state " << status.state() << std::endl;
+
+    if (status.state() == TASK_FINISHED) {
+      driver->stop();
+    } else if (status.state() == TASK_FAILED ||
+               status.state() == TASK_KILLED ||
+               status.state() == TASK_LOST) {
+      driver->abort();
+    }
+  }
+
+  virtual void frameworkMessage(SchedulerDriver* driver,
+                                const ExecutorID& executorId,
+                                const SlaveID& slaveId,
+                                const std::string& data)
+  {
+    std::cout << "Framework message: " << data << std::endl;
+  }
+
+  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid)
+  {
+    std::cout << "Slave lost" << std::endl;
+  }
+
+  virtual void executorLost(SchedulerDriver* driver,
+                            const ExecutorID& executorID,
+                            const SlaveID& slaveID,
+                            int status)
+  {
+    std::cout << "Executor lost" << std::endl;
+  }
+
+  virtual void error(SchedulerDriver* driver, const std::string& message)
+  {
+    std::cout << "Error message: " << message << std::endl;
+  }
+
+private:
+  const ExecutorInfo executor;
+  const size_t balloonLimit;
+  bool taskLaunched;
+};
+
+
+int main(int argc, char** argv)
+{
+  if (argc != 3) {
+    std::cerr << "Usage: " << argv[0]
+              << " <master> <balloon limit in MB>" << std::endl;
+    return -1;
+  }
+
+  // Verify the balloon limit.
+  Try<size_t> limit = numify<size_t>(argv[2]);
+  if (limit.isError()) {
+    std::cerr << "Balloon limit is not a valid number" << std::endl;
+    return -1;
+  }
+
+  if (limit.get() < EXECUTOR_MEMORY_MB) {
+    std::cerr << "Please use a balloon limit bigger than "
+              << EXECUTOR_MEMORY_MB << " MB" << std::endl;
+  }
+
+  // Find this executable's directory to locate executor.
+  char buf[MAXPATHLEN];
+  ::realpath(::dirname(argv[0]), buf);
+  std::string uri = std::string(buf) + "/balloon-executor";
+  if (getenv("MESOS_BUILD_DIR")) {
+    uri = std::string(::getenv("MESOS_BUILD_DIR")) + "/src/balloon-executor";
+  }
+
+  ExecutorInfo executor;
+  executor.mutable_executor_id()->set_value("default");
+  executor.mutable_command()->set_value(uri);
+  executor.set_data(stringify<size_t>(BALLOON_STEP_MB));
+
+  Resource* mem = executor.add_resources();
+  mem->set_name("mem");
+  mem->set_type(Value::SCALAR);
+  mem->mutable_scalar()->set_value(EXECUTOR_MEMORY_MB);
+
+  BalloonScheduler scheduler(executor, limit.get());
+
+  FrameworkInfo framework;
+  framework.set_user(""); // Have Mesos fill in the current user.
+  framework.set_name("Balloon Framework (C++)");
+
+  MesosSchedulerDriver driver(&scheduler, framework, argv[1]);
+
+  if (driver.run() == DRIVER_STOPPED) {
+    return 0;
+  } else {
+    // We stop the driver here so that we don't run into deadlock when the
+    // deallocator of the driver is called.
+    driver.stop();
+    return 1;
+  }
+}

Copied: incubator/mesos/trunk/src/examples/utils.hpp (from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/utils.hpp?p2=incubator/mesos/trunk/src/examples/utils.hpp&p1=incubator/mesos/trunk/src/slave/isolation_module.cpp&r1=1370079&r2=1370080&rev=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/examples/utils.hpp Tue Aug  7 00:28:51 2012
@@ -16,38 +16,30 @@
  * limitations under the License.
  */
 
-#include "isolation_module.hpp"
-#include "process_based_isolation_module.hpp"
-#ifdef __sun__
-#include "solaris_project_isolation_module.hpp"
-#elif __linux__
-#include "lxc_isolation_module.hpp"
-#endif
+#ifndef __EXAMPLES_UTILS_HPP__
+#define __EXAMPLES_UTILS_HPP__
 
+#include <string>
 
-namespace mesos { namespace internal { namespace slave {
-
-IsolationModule* IsolationModule::create(const std::string &type)
-{
-  if (type == "process")
-    return new ProcessBasedIsolationModule();
-#ifdef __sun__
-  else if (type == "project")
-    return new SolarisProjectIsolationModule();
-#elif __linux__
-  else if (type == "lxc")
-    return new LxcIsolationModule();
-#endif
-
-  return NULL;
-}
+#include <mesos/mesos.hpp>
 
+namespace mesos {
 
-void IsolationModule::destroy(IsolationModule* module)
+inline double getScalarResource(const Offer& offer, const std::string& name)
 {
-  if (module != NULL) {
-    delete module;
+  double value = 0.0;
+
+  for (int i = 0; i < offer.resources_size(); i++) {
+    const Resource& resource = offer.resources(i);
+    if (resource.name() == name &&
+        resource.type() == Value::SCALAR) {
+      value = resource.scalar().value();
+    }
   }
+
+  return value;
 }
 
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace mesos
+
+#endif // __EXAMPLES_UTILS_HPP__

Added: incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp (added)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp Tue Aug  7 00:28:51 2012
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/types.h>
+
+#include <sstream>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/units.hpp"
+
+#include "linux/cgroups.hpp"
+
+#include "slave/cgroups_isolation_module.hpp"
+
+using namespace process;
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const size_t CPU_SHARES_PER_CPU = 1024;
+const size_t MIN_CPU_SHARES = 10;
+const size_t MIN_MEMORY_MB = 32 * Megabyte;
+
+
+CgroupsIsolationModule::CgroupsIsolationModule()
+  : ProcessBase(ID::generate("cgroups-isolation-module")),
+    initialized(false)
+{
+  // Spawn the reaper, note that it might send us a message before we
+  // actually get spawned ourselves, but that's okay, the message will
+  // just get dropped.
+  reaper = new Reaper();
+  spawn(reaper);
+  dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
+
+
+CgroupsIsolationModule::~CgroupsIsolationModule()
+{
+  CHECK(reaper != NULL);
+  terminate(reaper);
+  process::wait(reaper); // Necessary for disambiguation.
+  delete reaper;
+}
+
+
+void CgroupsIsolationModule::initialize(
+    const Flags& _flags,
+    bool _local,
+    const PID<Slave>& _slave)
+{
+  flags = _flags;
+  local = _local;
+  slave = _slave;
+
+  // Make sure that we have root permission.
+  if (os::user() != "root") {
+    LOG(FATAL) << "Cgroups isolation module needs root permission";
+  }
+
+  // Make sure that cgroups is enabled by the kernel.
+  if (!cgroups::enabled()) {
+    LOG(FATAL) << "Cgroups is not supported by the kernel";
+  }
+
+  // Configure cgroups hierarchy root path.
+  hierarchy = flags.cgroups_hierarchy_root;
+
+  // Configure required/optional subsystems.
+  hashset<std::string> requiredSubsystems;
+  hashset<std::string> optionalSubsystems;
+
+  requiredSubsystems.insert("cpu");
+  requiredSubsystems.insert("memory");
+  requiredSubsystems.insert("freezer");
+
+  optionalSubsystems.insert("blkio");
+
+  // Probe cgroups subsystems.
+  hashset<std::string> enabledSubsystems;
+  hashset<std::string> busySubsystems;
+
+  Try<std::set<std::string> > enabled = cgroups::subsystems();
+  if (enabled.isError()) {
+    LOG(FATAL) << "Failed to probe cgroups subsystems: " << enabled.error();
+  } else {
+    foreach (const std::string& name, enabled.get()) {
+      enabledSubsystems.insert(name);
+
+      Try<bool> busy = cgroups::busy(name);
+      if (busy.isError()) {
+        LOG(FATAL) << "Failed to probe cgroups subsystems: " << busy.error();
+      }
+
+      if (busy.get()) {
+        busySubsystems.insert(name);
+      }
+    }
+  }
+
+  // Make sure that all the required subsystems are enabled by the kernel.
+  foreach (const std::string& name, requiredSubsystems) {
+    if (!enabledSubsystems.contains(name)) {
+      LOG(FATAL) << "Required subsystem " << name
+                 << " is not enabled by the kernel";
+    }
+  }
+
+  // Prepare the cgroups hierarchy root.
+  Try<bool> check = cgroups::checkHierarchy(hierarchy);
+  if (check.isError()) {
+    // The given hierarchy is not a cgroups hierarchy root. We will try to
+    // create a cgroups hierarchy root there.
+    if (os::exists(hierarchy)) {
+      // The path specified by the given hierarchy already exists in the file
+      // system. We try to remove it if it is an empty directory. This will
+      // helps us better deal with slave reboots as we don't need to manually
+      // remove the residue directory after a slave reboot.
+      if (::rmdir(hierarchy.c_str()) < 0) {
+        LOG(FATAL) << "Cannot create cgroups hierarchy root at " << hierarchy
+                   << ". Consider removing it.";
+      }
+    }
+
+    // The comma-separated subsystem names which will be passed to
+    // cgroups::createHierarchy to create the hierarchy root.
+    std::string subsystems;
+
+    // Make sure that all the required subsystems are not busy so that we can
+    // activate them in the given cgroups hierarchy root.
+    foreach (const std::string& name, requiredSubsystems) {
+      if (busySubsystems.contains(name)) {
+        LOG(FATAL) << "Required subsystem " << name << " is busy";
+      }
+
+      subsystems.append(name + ",");
+    }
+
+    // Also activate those optional subsystems that are not busy.
+    foreach (const std::string& name, optionalSubsystems) {
+      if (enabledSubsystems.contains(name) && !busySubsystems.contains(name)) {
+        subsystems.append(name + ",");
+      }
+    }
+
+    // Create the cgroups hierarchy root.
+    Try<bool> create = cgroups::createHierarchy(hierarchy,
+                                                strings::trim(subsystems, ","));
+    if (create.isError()) {
+      LOG(FATAL) << "Failed to create cgroups hierarchy root at " << hierarchy
+                 << ": " << create.error();
+    }
+  }
+
+  // Probe activated subsystems in the cgroups hierarchy root.
+  Try<std::set<std::string> > activated = cgroups::subsystems(hierarchy);
+  foreach (const std::string& name, activated.get()) {
+    activatedSubsystems.insert(name);
+  }
+
+  // Make sure that all the required subsystems are activated.
+  foreach (const std::string& name, requiredSubsystems) {
+    if (!activatedSubsystems.contains(name)) {
+      LOG(FATAL) << "Required subsystem " << name
+                 << " is not activated in hierarchy " << hierarchy;
+    }
+  }
+
+  // Configure resource subsystem mapping.
+  resourceSubsystemMap["cpus"] = "cpu";
+  resourceSubsystemMap["mem"] = "memory";
+
+  // Configure resource changed handlers.
+  resourceChangedHandlers["cpus"] = &CgroupsIsolationModule::cpusChanged;
+  resourceChangedHandlers["mem"] = &CgroupsIsolationModule::memChanged;
+
+  initialized = true;
+}
+
+
+void CgroupsIsolationModule::launchExecutor(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const std::string& directory,
+    const Resources& resources)
+{
+  CHECK(initialized) << "Cannot launch executors before initialization";
+
+  const ExecutorID& executorId = executorInfo.executor_id();
+
+  LOG(INFO) << "Launching " << executorId
+            << " (" << executorInfo.command().value() << ")"
+            << " in " << directory
+            << " with resources " << resources
+            << " for framework " << frameworkId;
+
+  // Register the cgroup information.
+  registerCgroupInfo(frameworkId, executorId);
+
+  // Create a new cgroup for the executor.
+  Try<bool> create =
+    cgroups::createCgroup(hierarchy, getCgroupName(frameworkId, executorId));
+  if (create.isError()) {
+    LOG(FATAL) << "Failed to create cgroup for executor " << executorId
+               << " of framework " << frameworkId
+               << ": " << create.error();
+  }
+
+  // Setup the initial resource constrains.
+  resourcesChanged(frameworkId, executorId, resources);
+
+  // Start listening on OOM events.
+  oomListen(frameworkId, executorId);
+
+  // Launch the executor using fork-exec.
+  pid_t pid;
+  if ((pid = ::fork()) == -1) {
+    LOG(FATAL) << "Failed to fork to launch new executor";
+  }
+
+  if (pid) {
+    // In parent process.
+    LOG(INFO) << "Forked executor at = " << pid;
+
+    // Store the pid of the leading process of the executor.
+    CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+    CHECK(info != NULL) << "Cannot find cgroup info";
+    info->pid = pid;
+
+    // Tell the slave this executor has started.
+    dispatch(slave,
+             &Slave::executorStarted,
+             frameworkId,
+             executorId,
+             pid);
+  } else {
+    // In child process.
+    // Put self into the newly created cgroup.
+    Try<bool> assign =
+      cgroups::assignTask(hierarchy,
+                          getCgroupName(frameworkId, executorId),
+                          ::getpid());
+    if (assign.isError()) {
+      LOG(FATAL) << "Failed to assign for executor " << executorId
+                 << " of framework " << frameworkId
+                 << ": " << assign.error();
+    }
+
+    launcher::ExecutorLauncher* launcher =
+      createExecutorLauncher(frameworkId,
+                             frameworkInfo,
+                             executorInfo,
+                             directory);
+    launcher->run();
+  }
+}
+
+
+void CgroupsIsolationModule::killExecutor(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  CHECK(initialized) << "Cannot kill executors before initialization";
+
+  CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+  if (info == NULL || info->killed) {
+    LOG(ERROR) << "Asked to kill an unknown/killed executor!";
+    return;
+  }
+
+  LOG(INFO) << "Killing executor " << executorId
+            << " of framework " << frameworkId;
+
+  // Stop the OOM listener if needed.
+  if (info->oomNotifier.isPending()) {
+    info->oomNotifier.discard();
+  }
+
+  // Destroy the cgroup that is associated with the executor. Here, we don't
+  // wait for it to succeed as we don't want to block the isolation module.
+  // Instead, we register a callback which will be invoked when its result is
+  // ready.
+  Future<bool> future =
+    cgroups::destroyCgroup(hierarchy,
+                           getCgroupName(frameworkId, executorId));
+  future.onAny(
+      defer(PID<CgroupsIsolationModule>(this),
+            &CgroupsIsolationModule::destroyWaited,
+            frameworkId,
+            executorId,
+            info->tag,
+            future));
+
+  // We do not unregister the cgroup info here, instead, we ask the process
+  // exit handler to unregister the cgroup info.
+  info->killed = true;
+}
+
+
+void CgroupsIsolationModule::resourcesChanged(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Resources& resources)
+{
+  CHECK(initialized) << "Cannot change resources before initialization";
+
+  CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+  if (info == NULL || info->killed) {
+    LOG(INFO) << "Asked to update resources for an unknown/killed executor";
+    return;
+  }
+
+  LOG(INFO) << "Changing cgroup controls for executor " << executorId
+            << " of framework " << frameworkId
+            << " with resources " << resources;
+
+  // For each resource, invoke the corresponding handler.
+  for (Resources::const_iterator it = resources.begin();
+       it != resources.end(); ++it) {
+    const Resource& resource = *it;
+    const std::string& name = resource.name();
+
+    if (resourceChangedHandlers.contains(name)) {
+      // We only call the resource changed handler either if the resource does
+      // not depend on any subsystem, or the dependent subsystem is active.
+      if (!resourceSubsystemMap.contains(name) ||
+          activatedSubsystems.contains(resourceSubsystemMap[name])) {
+        Try<bool> result =
+          (this->*resourceChangedHandlers[name])(frameworkId,
+                                                 executorId,
+                                                 resources);
+        if (result.isError()) {
+          LOG(ERROR) << result.error();
+        }
+      }
+    }
+  }
+}
+
+
+void CgroupsIsolationModule::processExited(pid_t pid, int status)
+{
+  CgroupInfo* info = findCgroupInfo(pid);
+  if (info != NULL) {
+    FrameworkID frameworkId = info->frameworkId;
+    ExecutorID executorId = info->executorId;
+
+    LOG(INFO) << "Telling slave of lost executor " << executorId
+              << " of framework " << frameworkId;
+
+    dispatch(slave,
+             &Slave::executorExited,
+             frameworkId,
+             executorId,
+             status);
+
+    if (!info->killed) {
+      killExecutor(frameworkId, executorId);
+    }
+
+    unregisterCgroupInfo(frameworkId, executorId);
+  }
+}
+
+
+launcher::ExecutorLauncher* CgroupsIsolationModule::createExecutorLauncher(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const std::string& directory)
+{
+  return new launcher::ExecutorLauncher(
+      frameworkId,
+      executorInfo.executor_id(),
+      executorInfo.command(),
+      frameworkInfo.user(),
+      directory,
+      slave,
+      flags.frameworks_home,
+      flags.hadoop_home,
+      !local,
+      flags.switch_user,
+      "");
+}
+
+
+Try<bool> CgroupsIsolationModule::cpusChanged(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Resources& resources)
+{
+  Resource r;
+  r.set_name("cpus");
+  r.set_type(Value::SCALAR);
+
+  Option<Resource> cpusResource = resources.get(r);
+  if (cpusResource.isNone()) {
+    LOG(WARNING) << "Resource cpus cannot be retrieved for executor "
+                 << executorId << " of framework " << frameworkId;
+  } else {
+    double cpus = cpusResource.get().scalar().value();
+    size_t cpuShares =
+      std::max(CPU_SHARES_PER_CPU * (size_t)cpus, MIN_CPU_SHARES);
+
+    Try<bool> set =
+      cgroups::writeControl(hierarchy,
+                            getCgroupName(frameworkId, executorId),
+                            "cpu.shares",
+                            stringify(cpuShares));
+    if (set.isError()) {
+      return Try<bool>::error(set.error());
+    }
+
+    LOG(INFO) << "Write cpu.shares = " << cpuShares
+              << " for executor " << executorId
+              << " of framework " << frameworkId;
+  }
+
+  return true;
+}
+
+
+Try<bool> CgroupsIsolationModule::memChanged(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Resources& resources)
+{
+  Resource r;
+  r.set_name("mem");
+  r.set_type(Value::SCALAR);
+
+  Option<Resource> memResource = resources.get(r);
+  if (memResource.isNone()) {
+    LOG(WARNING) << "Resource mem cannot be retrieved for executor "
+                 << executorId << " of framework " << frameworkId;
+  } else {
+    double mem = memResource.get().scalar().value();
+    size_t limitInBytes =
+      std::max((size_t)mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
+
+    Try<bool> set =
+      cgroups::writeControl(hierarchy,
+                            getCgroupName(frameworkId, executorId),
+                            "memory.limit_in_bytes",
+                            stringify(limitInBytes));
+    if (set.isError()) {
+      return Try<bool>::error(set.error());
+    }
+
+    LOG(INFO) << "Write memory.limit_in_bytes = " << limitInBytes
+              << " for executor " << executorId
+              << " of framework " << frameworkId;
+  }
+
+  return true;
+}
+
+
+void CgroupsIsolationModule::oomListen(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+  CHECK(info != NULL) << "Cgroup info is not registered";
+
+  info->oomNotifier =
+    cgroups::listenEvent(hierarchy,
+                         getCgroupName(frameworkId, executorId),
+                         "memory.oom_control");
+
+  // If the listening fails immediately, something very wrong happened.
+  // Therefore, we report a fatal error here.
+  if (info->oomNotifier.isFailed()) {
+    LOG(FATAL) << "Failed to listen for OOM events for executor " << executorId
+               << " of framework " << frameworkId
+               << ": "<< info->oomNotifier.failure();
+  }
+
+  LOG(INFO) << "Start listening for OOM events for executor " << executorId
+            << " of framework " << frameworkId;
+
+  info->oomNotifier.onAny(
+      defer(PID<CgroupsIsolationModule>(this),
+            &CgroupsIsolationModule::oomWaited,
+            frameworkId,
+            executorId,
+            info->tag,
+            info->oomNotifier));
+}
+
+
+void CgroupsIsolationModule::oomWaited(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const std::string& tag,
+    const Future<uint64_t>& future)
+{
+  LOG(INFO) << "OOM notifier is triggered for executor "
+            << executorId << " of framework " << frameworkId
+            << " with tag " << tag;
+
+  if (future.isDiscarded()) {
+    LOG(INFO) << "Discarded OOM notifier for executor "
+              << executorId << " of framework " << frameworkId
+              << " with tag " << tag;
+  } else if (future.isFailed()) {
+    LOG(ERROR) << "Listening on OOM events failed for executor "
+               << executorId << " of framework " << frameworkId
+               << " with tag " << tag << ": " << future.failure();
+  } else {
+    // Out-of-memory event happened, call the handler.
+    oom(frameworkId, executorId, tag);
+  }
+}
+
+
+void CgroupsIsolationModule::oom(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const std::string& tag)
+{
+  LOG(INFO) << "OOM detected in executor " << executorId
+            << " of framework " << frameworkId
+            << " with tag " << tag;
+
+  CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+  if (info == NULL) {
+    // It is likely that processExited is executed before this function (e.g.
+    // The kill and OOM events happen at the same time, and the process exit
+    // event arrives first.) Therefore, we should not report a fatal error here.
+    LOG(INFO) << "OOM detected for an exited executor";
+    return;
+  }
+
+  // To safely ignore the OOM event from the previous launch of the same
+  // executor (with the same frameworkId and executorId).
+  if (tag != info->tag) {
+    LOG(INFO) << "OOM detected for the previous launch of the same executor";
+    return;
+  }
+
+  // If killed is set, the OOM notifier will be discarded in oomWaited.
+  // Therefore, we should not be able to reach this point.
+  CHECK(!info->killed) << "OOM detected for a killed executor";
+
+  // TODO(jieyu): Have a mechanism to use a different policy (e.g. freeze the
+  // executor) when OOM happens.
+  killExecutor(frameworkId, executorId);
+}
+
+
+void CgroupsIsolationModule::destroyWaited(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const std::string& tag,
+    const Future<bool>& future)
+{
+  if (future.isReady()) {
+    LOG(INFO) << "Successfully destroyed the cgroup for executor "
+              << executorId << " of framework " << frameworkId
+              << " with tag " << tag;
+  } else {
+    LOG(FATAL) << "Failed to destroy the cgroup for executor "
+               << executorId << " of framework " << frameworkId
+               << " with tag " << tag << ": " << future.failure();
+  }
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::registerCgroupInfo(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  CgroupInfo* info = new CgroupInfo;
+  info->frameworkId = frameworkId;
+  info->executorId = executorId;
+  info->tag = UUID::random().toString();
+  info->pid = -1;
+  info->killed = false;
+  infos[frameworkId][executorId] = info;
+  return info;
+}
+
+
+void CgroupsIsolationModule::unregisterCgroupInfo(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  if (infos.contains(frameworkId)) {
+    if (infos[frameworkId].contains(executorId)) {
+      delete infos[frameworkId][executorId];
+      infos[frameworkId].erase(executorId);
+      if (infos[frameworkId].empty()) {
+        infos.erase(frameworkId);
+      }
+    }
+  }
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::findCgroupInfo(
+    pid_t pid)
+{
+  foreachkey (const FrameworkID& frameworkId, infos) {
+    foreachvalue (CgroupInfo* info, infos[frameworkId]) {
+      if (info->pid == pid) {
+        return info;
+      }
+    }
+  }
+  return NULL;
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::findCgroupInfo(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  if (infos.find(frameworkId) != infos.end()) {
+    if (infos[frameworkId].find(executorId) != infos[frameworkId].end()) {
+      return infos[frameworkId][executorId];
+    }
+  }
+  return NULL;
+}
+
+
+std::string CgroupsIsolationModule::getCgroupName(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+  CHECK(info != NULL) << "Cgroup info is not registered";
+
+  std::ostringstream out;
+  out << "mesos_cgroup_framework_" << frameworkId
+      << "_executor_" << executorId
+      << "_tag_" << info->tag;
+  return out.str();
+}
+
+} // namespace mesos {
+} // namespace internal {
+} // namespace slave {

Added: incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp (added)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp Tue Aug  7 00:28:51 2012
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CGROUPS_ISOLATION_MODULE_HPP__
+#define __CGROUPS_ISOLATION_MODULE_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+
+#include "launcher/launcher.hpp"
+
+#include "slave/flags.hpp"
+#include "slave/isolation_module.hpp"
+#include "slave/reaper.hpp"
+#include "slave/slave.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class CgroupsIsolationModule
+  : public IsolationModule,
+    public ProcessExitedListener
+{
+public:
+  CgroupsIsolationModule();
+
+  virtual ~CgroupsIsolationModule();
+
+  virtual void initialize(const Flags& flags,
+                          bool local,
+                          const process::PID<Slave>& slave);
+
+  virtual void launchExecutor(const FrameworkID& frameworkId,
+                              const FrameworkInfo& frameworkInfo,
+                              const ExecutorInfo& executorInfo,
+                              const std::string& directory,
+                              const Resources& resources);
+
+  virtual void killExecutor(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId);
+
+  virtual void resourcesChanged(const FrameworkID& frameworkId,
+                                const ExecutorID& executorId,
+                                const Resources& resources);
+
+  virtual void processExited(pid_t pid, int status);
+
+private:
+  // No copying, no assigning.
+  CgroupsIsolationModule(const CgroupsIsolationModule&);
+  CgroupsIsolationModule& operator = (const CgroupsIsolationModule&);
+
+  // The cgroup information for each live executor.
+  struct CgroupInfo
+  {
+    FrameworkID frameworkId;
+    ExecutorID executorId;
+
+    // The UUID tag to distinguish between different launches of the same
+    // executor (which have the same frameworkId and executorId).
+    std::string tag;
+
+    // PID of the leading process of the executor.
+    pid_t pid;
+
+    // Whether the executor has been killed.
+    bool killed;
+
+    // Used to cancel the OOM listening.
+    process::Future<uint64_t> oomNotifier;
+  };
+
+  // Main method executed after a fork() to create a Launcher for launching an
+  // executor's process. The Launcher will chdir() to the child's working
+  // directory, fetch the executor, set environment varibles, switch user, etc,
+  // and finally exec() the executor process.
+  launcher::ExecutorLauncher* createExecutorLauncher(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory);
+
+  // The callback which will be invoked when "cpus" resource has changed.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @param   resources     The handle for the resources.
+  // @return  Whether the operation successes.
+  Try<bool> cpusChanged(const FrameworkID& frameworkId,
+                        const ExecutorID& executorId,
+                        const Resources& resources);
+
+  // The callback which will be invoked when "mem" resource has changed.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @param   resources     The handle for the resources.
+  // @return  Whether the operation successes.
+  Try<bool> memChanged(const FrameworkID& frameworkId,
+                       const ExecutorID& executorId,
+                       const Resources& resources);
+
+  // Start listening on OOM events. This function will create an eventfd and
+  // start polling on it.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  void oomListen(const FrameworkID& frameworkId,
+                 const ExecutorID& executorId);
+
+  // This function is invoked when the polling on eventfd has a result.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @param   tag           The uuid tag.
+  void oomWaited(const FrameworkID& frameworkId,
+                 const ExecutorID& executorId,
+                 const std::string& tag,
+                 const process::Future<uint64_t>& future);
+
+  // This function is invoked when the OOM event happens.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @param   tag           The uuid tag.
+  void oom(const FrameworkID& frameworkId,
+           const ExecutorID& executorId,
+           const std::string& tag);
+
+  // This callback is invoked when destroy cgroup has a result.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @param   tag           The uuid tag.
+  // @param   future        The future describing the destroy process.
+  void destroyWaited(const FrameworkID& frameworkId,
+                     const ExecutorID& executorId,
+                     const std::string& tag,
+                     const process::Future<bool>& future);
+
+  // Register a cgroup in the isolation module.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @return  A pointer to the cgroup info registered.
+  CgroupInfo* registerCgroupInfo(const FrameworkID& frameworkId,
+                                 const ExecutorID& executorId);
+
+  // Unregister a cgroup in the isolation module.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  void unregisterCgroupInfo(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId);
+
+  // Find a registered cgroup by the PID of the leading process.
+  // @param   pid           The PID of the leading process in the cgroup.
+  // @return  A pointer to the cgroup info if found, NULL otherwise.
+  CgroupInfo* findCgroupInfo(pid_t pid);
+
+  // Find a registered cgroup by the frameworkId and the executorId.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @return  A pointer to the cgroup info if found, NULL otherwise.
+  CgroupInfo* findCgroupInfo(const FrameworkID& frameworkId,
+                             const ExecutorID& executorId);
+
+  // Return the canonicalized name of the cgroup used by a given executor in a
+  // given framework.
+  // @param   frameworkId   The id of the given framework.
+  // @param   executorId    The id of the given executor.
+  // @return  The canonicalized name of the cgroup.
+  std::string getCgroupName(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId);
+
+  Flags flags;
+  bool local;
+  process::PID<Slave> slave;
+  bool initialized;
+  Reaper* reaper;
+
+  // The cgroup information for each live executor.
+  hashmap<FrameworkID, hashmap<ExecutorID, CgroupInfo*> > infos;
+
+  // The path to the cgroups hierarchy root.
+  std::string hierarchy;
+
+  // The activated cgroups subsystems that can be used by the module.
+  hashset<std::string> activatedSubsystems;
+
+  // The mapping between resource name and corresponding cgroups subsystem.
+  hashmap<std::string, std::string> resourceSubsystemMap;
+
+  // Mapping between resource name to the corresponding resource changed
+  // handler function.
+  hashmap<std::string,
+          Try<bool>(CgroupsIsolationModule::*)(
+              const FrameworkID&,
+              const ExecutorID&,
+              const Resources&)> resourceChangedHandlers;
+};
+
+} // namespace mesos {
+} // namespace internal {
+} // namespace slave {
+
+#endif // __CGROUPS_ISOLATION_MODULE_HPP__

Modified: incubator/mesos/trunk/src/slave/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/flags.hpp?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/flags.hpp (original)
+++ incubator/mesos/trunk/src/slave/flags.hpp Tue Aug  7 00:28:51 2012
@@ -96,6 +96,13 @@ public:
         "Amount of time (in hours) to wait before\n"
         "cleaning up executor directories",
         GC_TIMEOUT_HOURS);
+
+#ifdef __linux__
+    add(&Flags::cgroups_hierarchy_root,
+        "cgroups_hierarchy_root",
+        "The path to the cgroups hierarchy root\n",
+        "/cgroups");
+#endif
   }
 
   Option<std::string> resources;
@@ -109,6 +116,9 @@ public:
   std::string frameworks_home;  // TODO(benh): Make an Option.
   double executor_shutdown_timeout_seconds;
   double gc_timeout_hours;
+#ifdef __linux__
+  std::string cgroups_hierarchy_root;
+#endif
 };
 
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/slave/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.cpp?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.cpp Tue Aug  7 00:28:51 2012
@@ -21,6 +21,7 @@
 #ifdef __sun__
 #include "solaris_project_isolation_module.hpp"
 #elif __linux__
+#include "cgroups_isolation_module.hpp"
 #include "lxc_isolation_module.hpp"
 #endif
 
@@ -35,6 +36,8 @@ IsolationModule* IsolationModule::create
   else if (type == "project")
     return new SolarisProjectIsolationModule();
 #elif __linux__
+  else if (type == "cgroups")
+    return new CgroupsIsolationModule();
   else if (type == "lxc")
     return new LxcIsolationModule();
 #endif

Copied: incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp (from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp?p2=incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp&p1=incubator/mesos/trunk/src/slave/isolation_module.cpp&r1=1370079&r2=1370080&rev=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp Tue Aug  7 00:28:51 2012
@@ -16,38 +16,9 @@
  * limitations under the License.
  */
 
-#include "isolation_module.hpp"
-#include "process_based_isolation_module.hpp"
-#ifdef __sun__
-#include "solaris_project_isolation_module.hpp"
-#elif __linux__
-#include "lxc_isolation_module.hpp"
-#endif
+#include <gtest/gtest.h>
 
+#include "tests/external_test.hpp"
 
-namespace mesos { namespace internal { namespace slave {
-
-IsolationModule* IsolationModule::create(const std::string &type)
-{
-  if (type == "process")
-    return new ProcessBasedIsolationModule();
-#ifdef __sun__
-  else if (type == "project")
-    return new SolarisProjectIsolationModule();
-#elif __linux__
-  else if (type == "lxc")
-    return new LxcIsolationModule();
-#endif
-
-  return NULL;
-}
-
-
-void IsolationModule::destroy(IsolationModule* module)
-{
-  if (module != NULL) {
-    delete module;
-  }
-}
-
-}}} // namespace mesos { namespace internal { namespace slave {
+// Run the balloon framework under cgroups isolation.
+TEST_EXTERNAL(CgroupsIsolation, ROOT_CGROUPS_BalloonFramework)

Added: incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh (added)
+++ incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh Tue Aug  7 00:28:51 2012
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# This script runs the balloon framework on a cluster using cgroups isolation.
+
+# Check that we're running as root
+if [[ $EUID -ne 0 ]]; then
+  echo "This test must be run as root." >&2
+  exit 2
+fi
+
+# Launch master
+$MESOS_BUILD_DIR/src/mesos-master --port=5432 > master.log 2>&1 &
+MASTER_PID=$!
+echo "Launched master, PID = $MASTER_PID"
+sleep 2
+
+# Check if it's still running after 2 seconds
+kill -0 $MASTER_PID
+KILL_EXIT_CODE=$?
+if [[ $KILL_EXIT_CODE -ne 0 ]]; then
+  echo "Master crashed; failing test"
+  exit 2
+fi
+
+# Launch slave
+$MESOS_BUILD_DIR/src/mesos-slave \
+    --master=localhost:5432 \
+    --isolation=cgroups \
+    --resources="cpus:1;mem:96" \
+    > slave.log 2>&1 &
+SLAVE_PID=$!
+echo "Launched slave, PID = $SLAVE_PID"
+sleep 2
+
+# Check if it's still running after 2 seconds
+kill -0 $SLAVE_PID
+KILL_EXIT_CODE=$?
+if [[ $KILL_EXIT_CODE -ne 0 ]]; then
+  echo "Slave crashed; failing test"
+  kill $MASTER_PID
+  exit 2
+fi
+
+# Launch balloon framework
+echo "Running balloon framework"
+$MESOS_BUILD_DIR/src/balloon-framework localhost:5432 \
+  1024 > balloon.log 2>&1
+EXIT_CODE=$?
+echo "Balloon framework exit code: $?"
+sleep 2
+
+# Kill everything once balloon framework exited
+echo "Killing slave: $SLAVE_PID"
+kill $SLAVE_PID
+sleep 2
+echo "Killing master: $MASTER_PID"
+kill $MASTER_PID
+sleep 2
+
+echo "Exiting"
+# Check whether balloon framework returned the right code
+if [[ $EXIT_CODE -eq 1 ]]; then
+  exit 0
+else
+  exit 1
+fi