You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 02:28:52 UTC
svn commit: r1370080 - in /incubator/mesos/trunk/src: ./ examples/ slave/
tests/ tests/external/CgroupsIsolation/
Author: benh
Date: Tue Aug 7 00:28:51 2012
New Revision: 1370080
URL: http://svn.apache.org/viewvc?rev=1370080&view=rev
Log:
Added the cgroups isolation module (contributed by Jie Yu,
https://reviews.apache.org/r/5509).
Added:
incubator/mesos/trunk/src/examples/balloon_executor.cpp
incubator/mesos/trunk/src/examples/balloon_framework.cpp
incubator/mesos/trunk/src/examples/utils.hpp
- copied, changed from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp
incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp
incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp
- copied, changed from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp
incubator/mesos/trunk/src/tests/external/CgroupsIsolation/
incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/slave/flags.hpp
incubator/mesos/trunk/src/slave/isolation_module.cpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Tue Aug 7 00:28:51 2012
@@ -170,11 +170,13 @@ pkginclude_HEADERS = $(top_srcdir)/inclu
nodist_pkginclude_HEADERS = ../include/mesos/mesos.hpp mesos.pb.h
if OS_LINUX
+ libmesos_no_third_party_la_SOURCES += slave/cgroups_isolation_module.cpp
libmesos_no_third_party_la_SOURCES += slave/lxc_isolation_module.cpp
libmesos_no_third_party_la_SOURCES += linux/cgroups.cpp
libmesos_no_third_party_la_SOURCES += linux/fs.cpp
libmesos_no_third_party_la_SOURCES += linux/proc.cpp
else
+ EXTRA_DIST += slave/cgroups_isolation_module.cpp
EXTRA_DIST += slave/lxc_isolation_module.cpp
EXTRA_DIST += linux/cgroups.cpp
EXTRA_DIST += linux/fs.cpp
@@ -200,6 +202,7 @@ libmesos_no_third_party_la_SOURCES += co
messages/messages.hpp slave/constants.hpp slave/flags.hpp \
slave/http.hpp slave/isolation_module.hpp \
slave/isolation_module_factory.hpp \
+ slave/cgroups_isolation_module.hpp \
slave/lxc_isolation_module.hpp \
slave/process_based_isolation_module.hpp slave/reaper.hpp \
slave/slave.hpp slave/solaris_project_isolation_module.hpp \
@@ -745,6 +748,16 @@ no_executor_framework_SOURCES = examples
no_executor_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
no_executor_framework_LDADD = libmesos.la
+check_PROGRAMS += balloon-framework
+balloon_framework_SOURCES = examples/balloon_framework.cpp
+balloon_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+balloon_framework_LDADD = libmesos.la
+
+check_PROGRAMS += balloon-executor
+balloon_executor_SOURCES = examples/balloon_executor.cpp
+balloon_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+balloon_executor_LDADD = libmesos.la
+
check_PROGRAMS += mesos-tests
mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp \
@@ -779,6 +792,7 @@ mesos_tests_LDADD = ../third_party/libgm
mesos_tests_DEPENDENCIES = # Initialized to allow += below.
if OS_LINUX
+ mesos_tests_SOURCES += tests/cgroups_isolation_tests.cpp
mesos_tests_SOURCES += tests/cgroups_tests.cpp
mesos_tests_SOURCES += tests/fs_tests.cpp
mesos_tests_SOURCES += tests/proc_tests.cpp
Added: incubator/mesos/trunk/src/examples/balloon_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/balloon_executor.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/examples/balloon_executor.cpp (added)
+++ incubator/mesos/trunk/src/examples/balloon_executor.cpp Tue Aug 7 00:28:51 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <string>
+
+#include <mesos/executor.hpp>
+
+#include <stout/numify.hpp>
+
+using namespace mesos;
+
+
+// This function will increase the memory footprint gradually. The parameter
+// limit specifies the upper limit (in MB) of the memory footprint. The
+// parameter step specifies the step size (in MB).
+static void balloon(size_t limit, size_t step)
+{
+ size_t chunk = step * 1024 * 1024;
+ for (size_t i = 0; i < limit / step; i++) {
+ std::cout << "Increasing memory footprint by "
+ << step << " MB" << std::endl;
+
+ // Allocate virtual memory.
+ char* buffer = (char *)malloc(chunk);
+
+ // We use memset here so that the memory actually gets paged in. However,
+ // the memory may get paged out again depending on the OS page replacement
+ // algorithm. Therefore, to ensure X MB of memory is actually used, we need
+ // to pass Y (Y > X) to this function.
+ ::memset(buffer, 1, chunk);
+
+ // Try not to increase the memory footprint too fast.
+ ::sleep(1);
+ }
+}
+
+
+class BalloonExecutor : public Executor
+{
+public:
+ virtual ~BalloonExecutor() {}
+
+ virtual void registered(ExecutorDriver* driver,
+ const ExecutorInfo& executorInfo,
+ const FrameworkInfo& frameworkInfo,
+ const SlaveInfo& slaveInfo)
+ {
+ // Setup balloon step.
+ Try<size_t> step = numify<size_t>(executorInfo.data());
+ assert(step.isSome());
+ balloonStep = step.get();
+
+ std::cout << "Registered" << std::endl;
+ }
+
+ virtual void reregistered(ExecutorDriver* driver,
+ const SlaveInfo& slaveInfo)
+ {
+ std::cout << "Reregistered" << std::endl;
+ }
+
+ virtual void disconnected(ExecutorDriver* driver)
+ {
+ std::cout << "Disconnected" << std::endl;
+ }
+
+ virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ {
+ std::cout << "Starting task " << task.task_id().value() << std::endl;
+
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(task.task_id());
+ status.set_state(TASK_RUNNING);
+
+ driver->sendStatusUpdate(status);
+
+ // Get the balloon limit (in MB).
+ Try<size_t> limit = numify<size_t>(task.data());
+ assert(limit.isSome());
+ size_t balloonLimit = limit.get();
+
+ // Artificially increase the memory usage gradually. The balloonLimit
+ // specifies the upper limit. The balloonLimit can be larger than the amount
+ // of memory allocated to this executor. In that case, the isolation module
+ // (e.g. cgroups) should be able to detect that and the task should not be
+ // able to reach TASK_FINISHED state.
+ balloon(balloonLimit, balloonStep);
+
+ std::cout << "Finishing task " << task.task_id().value() << std::endl;
+
+ status.mutable_task_id()->MergeFrom(task.task_id());
+ status.set_state(TASK_FINISHED);
+
+ driver->sendStatusUpdate(status);
+ }
+
+ virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ {
+ std::cout << "Kill task " << taskId.value() << std::endl;
+ }
+
+ virtual void frameworkMessage(ExecutorDriver* driver, const std::string& data)
+ {
+ std::cout << "Framework message: " << data << std::endl;
+ }
+
+ virtual void shutdown(ExecutorDriver* driver)
+ {
+ std::cout << "Shutdown" << std::endl;
+ }
+
+ virtual void error(ExecutorDriver* driver, const std::string& message)
+ {
+ std::cout << "Error message: " << message << std::endl;
+ }
+
+private:
+ // The amount of memory in MB each balloon step consumes.
+ size_t balloonStep;
+};
+
+
+int main(int argc, char** argv)
+{
+ BalloonExecutor executor;
+ MesosExecutorDriver driver(&executor);
+ return driver.run() == DRIVER_STOPPED ? 0 : 1;
+}
Added: incubator/mesos/trunk/src/examples/balloon_framework.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/balloon_framework.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/examples/balloon_framework.cpp (added)
+++ incubator/mesos/trunk/src/examples/balloon_framework.cpp Tue Aug 7 00:28:51 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <libgen.h>
+#include <stdlib.h>
+
+#include <sys/param.h>
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include <mesos/scheduler.hpp>
+
+#include <stout/numify.hpp>
+#include <stout/stringify.hpp>
+
+#include "examples/utils.hpp"
+
+using namespace mesos;
+
+
+// The amount of memory in MB the executor itself takes.
+const static size_t EXECUTOR_MEMORY_MB = 64;
+
+// The amount of memory in MB each balloon step consumes.
+const static size_t BALLOON_STEP_MB = 64;
+
+
+class BalloonScheduler : public Scheduler
+{
+public:
+ BalloonScheduler(const ExecutorInfo& _executor,
+ size_t _balloonLimit)
+ : executor(_executor),
+ balloonLimit(_balloonLimit),
+ taskLaunched(false) {}
+
+ virtual ~BalloonScheduler() {}
+
+ virtual void registered(SchedulerDriver*,
+ const FrameworkID&,
+ const MasterInfo&)
+ {
+ std::cout << "Registered" << std::endl;
+ }
+
+ virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo)
+ {
+ std::cout << "Reregistered" << std::endl;
+ }
+
+ virtual void disconnected(SchedulerDriver* driver)
+ {
+ std::cout << "Disconnected" << std::endl;
+ }
+
+ virtual void resourceOffers(SchedulerDriver* driver,
+ const std::vector<Offer>& offers)
+ {
+ std::cout << "Resource offers received" << std::endl;
+
+ for (int i = 0; i < offers.size(); i++) {
+ const Offer& offer = offers[i];
+
+ // We just launch one task.
+ if (!taskLaunched) {
+ double mem = getScalarResource(offer, "mem");
+ assert(mem > EXECUTOR_MEMORY_MB);
+
+ std::vector<TaskInfo> tasks;
+ std::cout << "Starting the task" << std::endl;
+
+ TaskInfo task;
+ task.set_name("Balloon Task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_executor()->MergeFrom(executor);
+ task.set_data(stringify<size_t>(balloonLimit));
+
+ // Use up all the memory from the offer.
+ Resource* resource;
+ resource = task.add_resources();
+ resource->set_name("mem");
+ resource->set_type(Value::SCALAR);
+ resource->mutable_scalar()->set_value(mem - EXECUTOR_MEMORY_MB);
+
+ tasks.push_back(task);
+ driver->launchTasks(offer.id(), tasks);
+
+ taskLaunched = true;
+ }
+ }
+ }
+
+ virtual void offerRescinded(SchedulerDriver* driver,
+ const OfferID& offerId)
+ {
+ std::cout << "Offer rescinded" << std::endl;
+ }
+
+ virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
+ {
+ std::cout << "Task in state " << status.state() << std::endl;
+
+ if (status.state() == TASK_FINISHED) {
+ driver->stop();
+ } else if (status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ driver->abort();
+ }
+ }
+
+ virtual void frameworkMessage(SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const std::string& data)
+ {
+ std::cout << "Framework message: " << data << std::endl;
+ }
+
+ virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid)
+ {
+ std::cout << "Slave lost" << std::endl;
+ }
+
+ virtual void executorLost(SchedulerDriver* driver,
+ const ExecutorID& executorID,
+ const SlaveID& slaveID,
+ int status)
+ {
+ std::cout << "Executor lost" << std::endl;
+ }
+
+ virtual void error(SchedulerDriver* driver, const std::string& message)
+ {
+ std::cout << "Error message: " << message << std::endl;
+ }
+
+private:
+ const ExecutorInfo executor;
+ const size_t balloonLimit;
+ bool taskLaunched;
+};
+
+
+int main(int argc, char** argv)
+{
+ if (argc != 3) {
+ std::cerr << "Usage: " << argv[0]
+ << " <master> <balloon limit in MB>" << std::endl;
+ return -1;
+ }
+
+ // Verify the balloon limit.
+ Try<size_t> limit = numify<size_t>(argv[2]);
+ if (limit.isError()) {
+ std::cerr << "Balloon limit is not a valid number" << std::endl;
+ return -1;
+ }
+
+ if (limit.get() < EXECUTOR_MEMORY_MB) {
+ std::cerr << "Please use a balloon limit bigger than "
+ << EXECUTOR_MEMORY_MB << " MB" << std::endl;
+ }
+
+ // Find this executable's directory to locate executor.
+ char buf[MAXPATHLEN];
+ ::realpath(::dirname(argv[0]), buf);
+ std::string uri = std::string(buf) + "/balloon-executor";
+ if (getenv("MESOS_BUILD_DIR")) {
+ uri = std::string(::getenv("MESOS_BUILD_DIR")) + "/src/balloon-executor";
+ }
+
+ ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
+ executor.mutable_command()->set_value(uri);
+ executor.set_data(stringify<size_t>(BALLOON_STEP_MB));
+
+ Resource* mem = executor.add_resources();
+ mem->set_name("mem");
+ mem->set_type(Value::SCALAR);
+ mem->mutable_scalar()->set_value(EXECUTOR_MEMORY_MB);
+
+ BalloonScheduler scheduler(executor, limit.get());
+
+ FrameworkInfo framework;
+ framework.set_user(""); // Have Mesos fill in the current user.
+ framework.set_name("Balloon Framework (C++)");
+
+ MesosSchedulerDriver driver(&scheduler, framework, argv[1]);
+
+ if (driver.run() == DRIVER_STOPPED) {
+ return 0;
+ } else {
+ // We stop the driver here so that we don't run into deadlock when the
+ // deallocator of the driver is called.
+ driver.stop();
+ return 1;
+ }
+}
Copied: incubator/mesos/trunk/src/examples/utils.hpp (from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/utils.hpp?p2=incubator/mesos/trunk/src/examples/utils.hpp&p1=incubator/mesos/trunk/src/slave/isolation_module.cpp&r1=1370079&r2=1370080&rev=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/examples/utils.hpp Tue Aug 7 00:28:51 2012
@@ -16,38 +16,30 @@
* limitations under the License.
*/
-#include "isolation_module.hpp"
-#include "process_based_isolation_module.hpp"
-#ifdef __sun__
-#include "solaris_project_isolation_module.hpp"
-#elif __linux__
-#include "lxc_isolation_module.hpp"
-#endif
+#ifndef __EXAMPLES_UTILS_HPP__
+#define __EXAMPLES_UTILS_HPP__
+#include <string>
-namespace mesos { namespace internal { namespace slave {
-
-IsolationModule* IsolationModule::create(const std::string &type)
-{
- if (type == "process")
- return new ProcessBasedIsolationModule();
-#ifdef __sun__
- else if (type == "project")
- return new SolarisProjectIsolationModule();
-#elif __linux__
- else if (type == "lxc")
- return new LxcIsolationModule();
-#endif
-
- return NULL;
-}
+#include <mesos/mesos.hpp>
+namespace mesos {
-void IsolationModule::destroy(IsolationModule* module)
+inline double getScalarResource(const Offer& offer, const std::string& name)
{
- if (module != NULL) {
- delete module;
+ double value = 0.0;
+
+ for (int i = 0; i < offer.resources_size(); i++) {
+ const Resource& resource = offer.resources(i);
+ if (resource.name() == name &&
+ resource.type() == Value::SCALAR) {
+ value = resource.scalar().value();
+ }
}
+
+ return value;
}
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace mesos
+
+#endif // __EXAMPLES_UTILS_HPP__
Added: incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp (added)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.cpp Tue Aug 7 00:28:51 2012
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/types.h>
+
+#include <sstream>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/units.hpp"
+
+#include "linux/cgroups.hpp"
+
+#include "slave/cgroups_isolation_module.hpp"
+
+using namespace process;
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const size_t CPU_SHARES_PER_CPU = 1024;
+const size_t MIN_CPU_SHARES = 10;
+const size_t MIN_MEMORY_MB = 32 * Megabyte;
+
+
+CgroupsIsolationModule::CgroupsIsolationModule()
+ : ProcessBase(ID::generate("cgroups-isolation-module")),
+ initialized(false)
+{
+ // Spawn the reaper, note that it might send us a message before we
+ // actually get spawned ourselves, but that's okay, the message will
+ // just get dropped.
+ reaper = new Reaper();
+ spawn(reaper);
+ dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
+
+
+CgroupsIsolationModule::~CgroupsIsolationModule()
+{
+ CHECK(reaper != NULL);
+ terminate(reaper);
+ process::wait(reaper); // Necessary for disambiguation.
+ delete reaper;
+}
+
+
+void CgroupsIsolationModule::initialize(
+ const Flags& _flags,
+ bool _local,
+ const PID<Slave>& _slave)
+{
+ flags = _flags;
+ local = _local;
+ slave = _slave;
+
+ // Make sure that we have root permission.
+ if (os::user() != "root") {
+ LOG(FATAL) << "Cgroups isolation module needs root permission";
+ }
+
+ // Make sure that cgroups is enabled by the kernel.
+ if (!cgroups::enabled()) {
+ LOG(FATAL) << "Cgroups is not supported by the kernel";
+ }
+
+ // Configure cgroups hierarchy root path.
+ hierarchy = flags.cgroups_hierarchy_root;
+
+ // Configure required/optional subsystems.
+ hashset<std::string> requiredSubsystems;
+ hashset<std::string> optionalSubsystems;
+
+ requiredSubsystems.insert("cpu");
+ requiredSubsystems.insert("memory");
+ requiredSubsystems.insert("freezer");
+
+ optionalSubsystems.insert("blkio");
+
+ // Probe cgroups subsystems.
+ hashset<std::string> enabledSubsystems;
+ hashset<std::string> busySubsystems;
+
+ Try<std::set<std::string> > enabled = cgroups::subsystems();
+ if (enabled.isError()) {
+ LOG(FATAL) << "Failed to probe cgroups subsystems: " << enabled.error();
+ } else {
+ foreach (const std::string& name, enabled.get()) {
+ enabledSubsystems.insert(name);
+
+ Try<bool> busy = cgroups::busy(name);
+ if (busy.isError()) {
+ LOG(FATAL) << "Failed to probe cgroups subsystems: " << busy.error();
+ }
+
+ if (busy.get()) {
+ busySubsystems.insert(name);
+ }
+ }
+ }
+
+ // Make sure that all the required subsystems are enabled by the kernel.
+ foreach (const std::string& name, requiredSubsystems) {
+ if (!enabledSubsystems.contains(name)) {
+ LOG(FATAL) << "Required subsystem " << name
+ << " is not enabled by the kernel";
+ }
+ }
+
+ // Prepare the cgroups hierarchy root.
+ Try<bool> check = cgroups::checkHierarchy(hierarchy);
+ if (check.isError()) {
+ // The given hierarchy is not a cgroups hierarchy root. We will try to
+ // create a cgroups hierarchy root there.
+ if (os::exists(hierarchy)) {
+ // The path specified by the given hierarchy already exists in the file
+ // system. We try to remove it if it is an empty directory. This will
+ // helps us better deal with slave reboots as we don't need to manually
+ // remove the residue directory after a slave reboot.
+ if (::rmdir(hierarchy.c_str()) < 0) {
+ LOG(FATAL) << "Cannot create cgroups hierarchy root at " << hierarchy
+ << ". Consider removing it.";
+ }
+ }
+
+ // The comma-separated subsystem names which will be passed to
+ // cgroups::createHierarchy to create the hierarchy root.
+ std::string subsystems;
+
+ // Make sure that all the required subsystems are not busy so that we can
+ // activate them in the given cgroups hierarchy root.
+ foreach (const std::string& name, requiredSubsystems) {
+ if (busySubsystems.contains(name)) {
+ LOG(FATAL) << "Required subsystem " << name << " is busy";
+ }
+
+ subsystems.append(name + ",");
+ }
+
+ // Also activate those optional subsystems that are not busy.
+ foreach (const std::string& name, optionalSubsystems) {
+ if (enabledSubsystems.contains(name) && !busySubsystems.contains(name)) {
+ subsystems.append(name + ",");
+ }
+ }
+
+ // Create the cgroups hierarchy root.
+ Try<bool> create = cgroups::createHierarchy(hierarchy,
+ strings::trim(subsystems, ","));
+ if (create.isError()) {
+ LOG(FATAL) << "Failed to create cgroups hierarchy root at " << hierarchy
+ << ": " << create.error();
+ }
+ }
+
+ // Probe activated subsystems in the cgroups hierarchy root.
+ Try<std::set<std::string> > activated = cgroups::subsystems(hierarchy);
+ foreach (const std::string& name, activated.get()) {
+ activatedSubsystems.insert(name);
+ }
+
+ // Make sure that all the required subsystems are activated.
+ foreach (const std::string& name, requiredSubsystems) {
+ if (!activatedSubsystems.contains(name)) {
+ LOG(FATAL) << "Required subsystem " << name
+ << " is not activated in hierarchy " << hierarchy;
+ }
+ }
+
+ // Configure resource subsystem mapping.
+ resourceSubsystemMap["cpus"] = "cpu";
+ resourceSubsystemMap["mem"] = "memory";
+
+ // Configure resource changed handlers.
+ resourceChangedHandlers["cpus"] = &CgroupsIsolationModule::cpusChanged;
+ resourceChangedHandlers["mem"] = &CgroupsIsolationModule::memChanged;
+
+ initialized = true;
+}
+
+
+void CgroupsIsolationModule::launchExecutor(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Resources& resources)
+{
+ CHECK(initialized) << "Cannot launch executors before initialization";
+
+ const ExecutorID& executorId = executorInfo.executor_id();
+
+ LOG(INFO) << "Launching " << executorId
+ << " (" << executorInfo.command().value() << ")"
+ << " in " << directory
+ << " with resources " << resources
+ << " for framework " << frameworkId;
+
+ // Register the cgroup information.
+ registerCgroupInfo(frameworkId, executorId);
+
+ // Create a new cgroup for the executor.
+ Try<bool> create =
+ cgroups::createCgroup(hierarchy, getCgroupName(frameworkId, executorId));
+ if (create.isError()) {
+ LOG(FATAL) << "Failed to create cgroup for executor " << executorId
+ << " of framework " << frameworkId
+ << ": " << create.error();
+ }
+
+ // Setup the initial resource constrains.
+ resourcesChanged(frameworkId, executorId, resources);
+
+ // Start listening on OOM events.
+ oomListen(frameworkId, executorId);
+
+ // Launch the executor using fork-exec.
+ pid_t pid;
+ if ((pid = ::fork()) == -1) {
+ LOG(FATAL) << "Failed to fork to launch new executor";
+ }
+
+ if (pid) {
+ // In parent process.
+ LOG(INFO) << "Forked executor at = " << pid;
+
+ // Store the pid of the leading process of the executor.
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ CHECK(info != NULL) << "Cannot find cgroup info";
+ info->pid = pid;
+
+ // Tell the slave this executor has started.
+ dispatch(slave,
+ &Slave::executorStarted,
+ frameworkId,
+ executorId,
+ pid);
+ } else {
+ // In child process.
+ // Put self into the newly created cgroup.
+ Try<bool> assign =
+ cgroups::assignTask(hierarchy,
+ getCgroupName(frameworkId, executorId),
+ ::getpid());
+ if (assign.isError()) {
+ LOG(FATAL) << "Failed to assign for executor " << executorId
+ << " of framework " << frameworkId
+ << ": " << assign.error();
+ }
+
+ launcher::ExecutorLauncher* launcher =
+ createExecutorLauncher(frameworkId,
+ frameworkInfo,
+ executorInfo,
+ directory);
+ launcher->run();
+ }
+}
+
+
+void CgroupsIsolationModule::killExecutor(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ CHECK(initialized) << "Cannot kill executors before initialization";
+
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ if (info == NULL || info->killed) {
+ LOG(ERROR) << "Asked to kill an unknown/killed executor!";
+ return;
+ }
+
+ LOG(INFO) << "Killing executor " << executorId
+ << " of framework " << frameworkId;
+
+ // Stop the OOM listener if needed.
+ if (info->oomNotifier.isPending()) {
+ info->oomNotifier.discard();
+ }
+
+ // Destroy the cgroup that is associated with the executor. Here, we don't
+ // wait for it to succeed as we don't want to block the isolation module.
+ // Instead, we register a callback which will be invoked when its result is
+ // ready.
+ Future<bool> future =
+ cgroups::destroyCgroup(hierarchy,
+ getCgroupName(frameworkId, executorId));
+ future.onAny(
+ defer(PID<CgroupsIsolationModule>(this),
+ &CgroupsIsolationModule::destroyWaited,
+ frameworkId,
+ executorId,
+ info->tag,
+ future));
+
+ // We do not unregister the cgroup info here, instead, we ask the process
+ // exit handler to unregister the cgroup info.
+ info->killed = true;
+}
+
+
+void CgroupsIsolationModule::resourcesChanged(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources)
+{
+ CHECK(initialized) << "Cannot change resources before initialization";
+
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ if (info == NULL || info->killed) {
+ LOG(INFO) << "Asked to update resources for an unknown/killed executor";
+ return;
+ }
+
+ LOG(INFO) << "Changing cgroup controls for executor " << executorId
+ << " of framework " << frameworkId
+ << " with resources " << resources;
+
+ // For each resource, invoke the corresponding handler.
+ for (Resources::const_iterator it = resources.begin();
+ it != resources.end(); ++it) {
+ const Resource& resource = *it;
+ const std::string& name = resource.name();
+
+ if (resourceChangedHandlers.contains(name)) {
+ // We only call the resource changed handler either if the resource does
+ // not depend on any subsystem, or the dependent subsystem is active.
+ if (!resourceSubsystemMap.contains(name) ||
+ activatedSubsystems.contains(resourceSubsystemMap[name])) {
+ Try<bool> result =
+ (this->*resourceChangedHandlers[name])(frameworkId,
+ executorId,
+ resources);
+ if (result.isError()) {
+ LOG(ERROR) << result.error();
+ }
+ }
+ }
+ }
+}
+
+
+void CgroupsIsolationModule::processExited(pid_t pid, int status)
+{
+ CgroupInfo* info = findCgroupInfo(pid);
+ if (info != NULL) {
+ FrameworkID frameworkId = info->frameworkId;
+ ExecutorID executorId = info->executorId;
+
+ LOG(INFO) << "Telling slave of lost executor " << executorId
+ << " of framework " << frameworkId;
+
+ dispatch(slave,
+ &Slave::executorExited,
+ frameworkId,
+ executorId,
+ status);
+
+ if (!info->killed) {
+ killExecutor(frameworkId, executorId);
+ }
+
+ unregisterCgroupInfo(frameworkId, executorId);
+ }
+}
+
+
+launcher::ExecutorLauncher* CgroupsIsolationModule::createExecutorLauncher(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory)
+{
+ return new launcher::ExecutorLauncher(
+ frameworkId,
+ executorInfo.executor_id(),
+ executorInfo.command(),
+ frameworkInfo.user(),
+ directory,
+ slave,
+ flags.frameworks_home,
+ flags.hadoop_home,
+ !local,
+ flags.switch_user,
+ "");
+}
+
+
+Try<bool> CgroupsIsolationModule::cpusChanged(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources)
+{
+ Resource r;
+ r.set_name("cpus");
+ r.set_type(Value::SCALAR);
+
+ Option<Resource> cpusResource = resources.get(r);
+ if (cpusResource.isNone()) {
+ LOG(WARNING) << "Resource cpus cannot be retrieved for executor "
+ << executorId << " of framework " << frameworkId;
+ } else {
+ double cpus = cpusResource.get().scalar().value();
+ size_t cpuShares =
+ std::max(CPU_SHARES_PER_CPU * (size_t)cpus, MIN_CPU_SHARES);
+
+ Try<bool> set =
+ cgroups::writeControl(hierarchy,
+ getCgroupName(frameworkId, executorId),
+ "cpu.shares",
+ stringify(cpuShares));
+ if (set.isError()) {
+ return Try<bool>::error(set.error());
+ }
+
+ LOG(INFO) << "Write cpu.shares = " << cpuShares
+ << " for executor " << executorId
+ << " of framework " << frameworkId;
+ }
+
+ return true;
+}
+
+
+Try<bool> CgroupsIsolationModule::memChanged(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources)
+{
+ Resource r;
+ r.set_name("mem");
+ r.set_type(Value::SCALAR);
+
+ Option<Resource> memResource = resources.get(r);
+ if (memResource.isNone()) {
+ LOG(WARNING) << "Resource mem cannot be retrieved for executor "
+ << executorId << " of framework " << frameworkId;
+ } else {
+ double mem = memResource.get().scalar().value();
+ size_t limitInBytes =
+ std::max((size_t)mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
+
+ Try<bool> set =
+ cgroups::writeControl(hierarchy,
+ getCgroupName(frameworkId, executorId),
+ "memory.limit_in_bytes",
+ stringify(limitInBytes));
+ if (set.isError()) {
+ return Try<bool>::error(set.error());
+ }
+
+ LOG(INFO) << "Write memory.limit_in_bytes = " << limitInBytes
+ << " for executor " << executorId
+ << " of framework " << frameworkId;
+ }
+
+ return true;
+}
+
+
+void CgroupsIsolationModule::oomListen(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ CHECK(info != NULL) << "Cgroup info is not registered";
+
+ info->oomNotifier =
+ cgroups::listenEvent(hierarchy,
+ getCgroupName(frameworkId, executorId),
+ "memory.oom_control");
+
+ // If the listening fails immediately, something very wrong happened.
+ // Therefore, we report a fatal error here.
+ if (info->oomNotifier.isFailed()) {
+ LOG(FATAL) << "Failed to listen for OOM events for executor " << executorId
+ << " of framework " << frameworkId
+ << ": "<< info->oomNotifier.failure();
+ }
+
+ LOG(INFO) << "Start listening for OOM events for executor " << executorId
+ << " of framework " << frameworkId;
+
+ info->oomNotifier.onAny(
+ defer(PID<CgroupsIsolationModule>(this),
+ &CgroupsIsolationModule::oomWaited,
+ frameworkId,
+ executorId,
+ info->tag,
+ info->oomNotifier));
+}
+
+
+void CgroupsIsolationModule::oomWaited(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag,
+ const Future<uint64_t>& future)
+{
+ LOG(INFO) << "OOM notifier is triggered for executor "
+ << executorId << " of framework " << frameworkId
+ << " with tag " << tag;
+
+ if (future.isDiscarded()) {
+ LOG(INFO) << "Discarded OOM notifier for executor "
+ << executorId << " of framework " << frameworkId
+ << " with tag " << tag;
+ } else if (future.isFailed()) {
+ LOG(ERROR) << "Listening on OOM events failed for executor "
+ << executorId << " of framework " << frameworkId
+ << " with tag " << tag << ": " << future.failure();
+ } else {
+ // Out-of-memory event happened, call the handler.
+ oom(frameworkId, executorId, tag);
+ }
+}
+
+
+void CgroupsIsolationModule::oom(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag)
+{
+ LOG(INFO) << "OOM detected in executor " << executorId
+ << " of framework " << frameworkId
+ << " with tag " << tag;
+
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ if (info == NULL) {
+ // It is likely that processExited is executed before this function (e.g.
+ // The kill and OOM events happen at the same time, and the process exit
+ // event arrives first.) Therefore, we should not report a fatal error here.
+ LOG(INFO) << "OOM detected for an exited executor";
+ return;
+ }
+
+ // To safely ignore the OOM event from the previous launch of the same
+ // executor (with the same frameworkId and executorId).
+ if (tag != info->tag) {
+ LOG(INFO) << "OOM detected for the previous launch of the same executor";
+ return;
+ }
+
+ // If killed is set, the OOM notifier will be discarded in oomWaited.
+ // Therefore, we should not be able to reach this point.
+ CHECK(!info->killed) << "OOM detected for a killed executor";
+
+ // TODO(jieyu): Have a mechanism to use a different policy (e.g. freeze the
+ // executor) when OOM happens.
+ killExecutor(frameworkId, executorId);
+}
+
+
+void CgroupsIsolationModule::destroyWaited(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag,
+ const Future<bool>& future)
+{
+ if (future.isReady()) {
+ LOG(INFO) << "Successfully destroyed the cgroup for executor "
+ << executorId << " of framework " << frameworkId
+ << " with tag " << tag;
+ } else {
+ LOG(FATAL) << "Failed to destroy the cgroup for executor "
+ << executorId << " of framework " << frameworkId
+ << " with tag " << tag << ": " << future.failure();
+ }
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::registerCgroupInfo(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ CgroupInfo* info = new CgroupInfo;
+ info->frameworkId = frameworkId;
+ info->executorId = executorId;
+ info->tag = UUID::random().toString();
+ info->pid = -1;
+ info->killed = false;
+ infos[frameworkId][executorId] = info;
+ return info;
+}
+
+
+void CgroupsIsolationModule::unregisterCgroupInfo(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ if (infos.contains(frameworkId)) {
+ if (infos[frameworkId].contains(executorId)) {
+ delete infos[frameworkId][executorId];
+ infos[frameworkId].erase(executorId);
+ if (infos[frameworkId].empty()) {
+ infos.erase(frameworkId);
+ }
+ }
+ }
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::findCgroupInfo(
+ pid_t pid)
+{
+ foreachkey (const FrameworkID& frameworkId, infos) {
+ foreachvalue (CgroupInfo* info, infos[frameworkId]) {
+ if (info->pid == pid) {
+ return info;
+ }
+ }
+ }
+ return NULL;
+}
+
+
+CgroupsIsolationModule::CgroupInfo* CgroupsIsolationModule::findCgroupInfo(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ if (infos.find(frameworkId) != infos.end()) {
+ if (infos[frameworkId].find(executorId) != infos[frameworkId].end()) {
+ return infos[frameworkId][executorId];
+ }
+ }
+ return NULL;
+}
+
+
+std::string CgroupsIsolationModule::getCgroupName(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ CgroupInfo* info = findCgroupInfo(frameworkId, executorId);
+ CHECK(info != NULL) << "Cgroup info is not registered";
+
+ std::ostringstream out;
+ out << "mesos_cgroup_framework_" << frameworkId
+ << "_executor_" << executorId
+ << "_tag_" << info->tag;
+ return out.str();
+}
+
+} // namespace mesos {
+} // namespace internal {
+} // namespace slave {
Added: incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp (added)
+++ incubator/mesos/trunk/src/slave/cgroups_isolation_module.hpp Tue Aug 7 00:28:51 2012
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CGROUPS_ISOLATION_MODULE_HPP__
+#define __CGROUPS_ISOLATION_MODULE_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+
+#include "launcher/launcher.hpp"
+
+#include "slave/flags.hpp"
+#include "slave/isolation_module.hpp"
+#include "slave/reaper.hpp"
+#include "slave/slave.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class CgroupsIsolationModule
+ : public IsolationModule,
+ public ProcessExitedListener
+{
+public:
+ CgroupsIsolationModule();
+
+ virtual ~CgroupsIsolationModule();
+
+ virtual void initialize(const Flags& flags,
+ bool local,
+ const process::PID<Slave>& slave);
+
+ virtual void launchExecutor(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Resources& resources);
+
+ virtual void killExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ virtual void resourcesChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources);
+
+ virtual void processExited(pid_t pid, int status);
+
+private:
+ // No copying, no assigning.
+ CgroupsIsolationModule(const CgroupsIsolationModule&);
+ CgroupsIsolationModule& operator = (const CgroupsIsolationModule&);
+
+ // The cgroup information for each live executor.
+ struct CgroupInfo
+ {
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+
+ // The UUID tag to distinguish between different launches of the same
+ // executor (which have the same frameworkId and executorId).
+ std::string tag;
+
+ // PID of the leading process of the executor.
+ pid_t pid;
+
+ // Whether the executor has been killed.
+ bool killed;
+
+ // Used to cancel the OOM listening.
+ process::Future<uint64_t> oomNotifier;
+ };
+
+ // Main method executed after a fork() to create a Launcher for launching an
+ // executor's process. The Launcher will chdir() to the child's working
+ // directory, fetch the executor, set environment varibles, switch user, etc,
+ // and finally exec() the executor process.
+ launcher::ExecutorLauncher* createExecutorLauncher(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory);
+
+ // The callback which will be invoked when "cpus" resource has changed.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @param resources The handle for the resources.
+ // @return Whether the operation successes.
+ Try<bool> cpusChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources);
+
+ // The callback which will be invoked when "mem" resource has changed.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @param resources The handle for the resources.
+ // @return Whether the operation successes.
+ Try<bool> memChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources);
+
+ // Start listening on OOM events. This function will create an eventfd and
+ // start polling on it.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ void oomListen(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ // This function is invoked when the polling on eventfd has a result.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @param tag The uuid tag.
+ void oomWaited(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag,
+ const process::Future<uint64_t>& future);
+
+ // This function is invoked when the OOM event happens.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @param tag The uuid tag.
+ void oom(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag);
+
+ // This callback is invoked when destroy cgroup has a result.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @param tag The uuid tag.
+ // @param future The future describing the destroy process.
+ void destroyWaited(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& tag,
+ const process::Future<bool>& future);
+
+ // Register a cgroup in the isolation module.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @return A pointer to the cgroup info registered.
+ CgroupInfo* registerCgroupInfo(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ // Unregister a cgroup in the isolation module.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ void unregisterCgroupInfo(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ // Find a registered cgroup by the PID of the leading process.
+ // @param pid The PID of the leading process in the cgroup.
+ // @return A pointer to the cgroup info if found, NULL otherwise.
+ CgroupInfo* findCgroupInfo(pid_t pid);
+
+ // Find a registered cgroup by the frameworkId and the executorId.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @return A pointer to the cgroup info if found, NULL otherwise.
+ CgroupInfo* findCgroupInfo(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ // Return the canonicalized name of the cgroup used by a given executor in a
+ // given framework.
+ // @param frameworkId The id of the given framework.
+ // @param executorId The id of the given executor.
+ // @return The canonicalized name of the cgroup.
+ std::string getCgroupName(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ Flags flags;
+ bool local;
+ process::PID<Slave> slave;
+ bool initialized;
+ Reaper* reaper;
+
+ // The cgroup information for each live executor.
+ hashmap<FrameworkID, hashmap<ExecutorID, CgroupInfo*> > infos;
+
+ // The path to the cgroups hierarchy root.
+ std::string hierarchy;
+
+ // The activated cgroups subsystems that can be used by the module.
+ hashset<std::string> activatedSubsystems;
+
+ // The mapping between resource name and corresponding cgroups subsystem.
+ hashmap<std::string, std::string> resourceSubsystemMap;
+
+ // Mapping between resource name to the corresponding resource changed
+ // handler function.
+ hashmap<std::string,
+ Try<bool>(CgroupsIsolationModule::*)(
+ const FrameworkID&,
+ const ExecutorID&,
+ const Resources&)> resourceChangedHandlers;
+};
+
+} // namespace mesos {
+} // namespace internal {
+} // namespace slave {
+
+#endif // __CGROUPS_ISOLATION_MODULE_HPP__
Modified: incubator/mesos/trunk/src/slave/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/flags.hpp?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/flags.hpp (original)
+++ incubator/mesos/trunk/src/slave/flags.hpp Tue Aug 7 00:28:51 2012
@@ -96,6 +96,13 @@ public:
"Amount of time (in hours) to wait before\n"
"cleaning up executor directories",
GC_TIMEOUT_HOURS);
+
+#ifdef __linux__
+ add(&Flags::cgroups_hierarchy_root,
+ "cgroups_hierarchy_root",
+ "The path to the cgroups hierarchy root\n",
+ "/cgroups");
+#endif
}
Option<std::string> resources;
@@ -109,6 +116,9 @@ public:
std::string frameworks_home; // TODO(benh): Make an Option.
double executor_shutdown_timeout_seconds;
double gc_timeout_hours;
+#ifdef __linux__
+ std::string cgroups_hierarchy_root;
+#endif
};
} // namespace mesos {
Modified: incubator/mesos/trunk/src/slave/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.cpp?rev=1370080&r1=1370079&r2=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.cpp Tue Aug 7 00:28:51 2012
@@ -21,6 +21,7 @@
#ifdef __sun__
#include "solaris_project_isolation_module.hpp"
#elif __linux__
+#include "cgroups_isolation_module.hpp"
#include "lxc_isolation_module.hpp"
#endif
@@ -35,6 +36,8 @@ IsolationModule* IsolationModule::create
else if (type == "project")
return new SolarisProjectIsolationModule();
#elif __linux__
+ else if (type == "cgroups")
+ return new CgroupsIsolationModule();
else if (type == "lxc")
return new LxcIsolationModule();
#endif
Copied: incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp (from r1370079, incubator/mesos/trunk/src/slave/isolation_module.cpp)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp?p2=incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp&p1=incubator/mesos/trunk/src/slave/isolation_module.cpp&r1=1370079&r2=1370080&rev=1370080&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/tests/cgroups_isolation_tests.cpp Tue Aug 7 00:28:51 2012
@@ -16,38 +16,9 @@
* limitations under the License.
*/
-#include "isolation_module.hpp"
-#include "process_based_isolation_module.hpp"
-#ifdef __sun__
-#include "solaris_project_isolation_module.hpp"
-#elif __linux__
-#include "lxc_isolation_module.hpp"
-#endif
+#include <gtest/gtest.h>
+#include "tests/external_test.hpp"
-namespace mesos { namespace internal { namespace slave {
-
-IsolationModule* IsolationModule::create(const std::string &type)
-{
- if (type == "process")
- return new ProcessBasedIsolationModule();
-#ifdef __sun__
- else if (type == "project")
- return new SolarisProjectIsolationModule();
-#elif __linux__
- else if (type == "lxc")
- return new LxcIsolationModule();
-#endif
-
- return NULL;
-}
-
-
-void IsolationModule::destroy(IsolationModule* module)
-{
- if (module != NULL) {
- delete module;
- }
-}
-
-}}} // namespace mesos { namespace internal { namespace slave {
+// Run the balloon framework under cgroups isolation.
+TEST_EXTERNAL(CgroupsIsolation, ROOT_CGROUPS_BalloonFramework)
Added: incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh?rev=1370080&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh (added)
+++ incubator/mesos/trunk/src/tests/external/CgroupsIsolation/ROOT_CGROUPS_BalloonFramework.sh Tue Aug 7 00:28:51 2012
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# This script runs the balloon framework on a cluster using cgroups isolation.
+
+# Check that we're running as root
+if [[ $EUID -ne 0 ]]; then
+ echo "This test must be run as root." >&2
+ exit 2
+fi
+
+# Launch master
+$MESOS_BUILD_DIR/src/mesos-master --port=5432 > master.log 2>&1 &
+MASTER_PID=$!
+echo "Launched master, PID = $MASTER_PID"
+sleep 2
+
+# Check if it's still running after 2 seconds
+kill -0 $MASTER_PID
+KILL_EXIT_CODE=$?
+if [[ $KILL_EXIT_CODE -ne 0 ]]; then
+ echo "Master crashed; failing test"
+ exit 2
+fi
+
+# Launch slave
+$MESOS_BUILD_DIR/src/mesos-slave \
+ --master=localhost:5432 \
+ --isolation=cgroups \
+ --resources="cpus:1;mem:96" \
+ > slave.log 2>&1 &
+SLAVE_PID=$!
+echo "Launched slave, PID = $SLAVE_PID"
+sleep 2
+
+# Check if it's still running after 2 seconds
+kill -0 $SLAVE_PID
+KILL_EXIT_CODE=$?
+if [[ $KILL_EXIT_CODE -ne 0 ]]; then
+ echo "Slave crashed; failing test"
+ kill $MASTER_PID
+ exit 2
+fi
+
+# Launch balloon framework
+echo "Running balloon framework"
+$MESOS_BUILD_DIR/src/balloon-framework localhost:5432 \
+ 1024 > balloon.log 2>&1
+EXIT_CODE=$?
+echo "Balloon framework exit code: $?"
+sleep 2
+
+# Kill everything once balloon framework exited
+echo "Killing slave: $SLAVE_PID"
+kill $SLAVE_PID
+sleep 2
+echo "Killing master: $MASTER_PID"
+kill $MASTER_PID
+sleep 2
+
+echo "Exiting"
+# Check whether balloon framework returned the right code
+if [[ $EXIT_CODE -eq 1 ]]; then
+ exit 0
+else
+ exit 1
+fi