You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/08/22 03:40:59 UTC
mesos git commit: Implemented `MemorySubsystem`.
Repository: mesos
Updated Branches:
refs/heads/master a43924a2d -> 178c1fa22
Implemented `MemorySubsystem`.
Review: https://reviews.apache.org/r/49851/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/178c1fa2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/178c1fa2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/178c1fa2
Branch: refs/heads/master
Commit: 178c1fa227b928c5a533d28f04ea0b6f924606aa
Parents: a43924a
Author: haosdent huang <ha...@gmail.com>
Authored: Sun Aug 21 10:58:05 2016 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Aug 21 20:40:50 2016 -0700
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/Makefile.am | 2 +
.../mesos/isolators/cgroups/cgroups.cpp | 30 +
.../mesos/isolators/cgroups/cgroups.hpp | 4 +
.../mesos/isolators/cgroups/constants.hpp | 1 +
.../mesos/isolators/cgroups/subsystem.cpp | 10 +
.../mesos/isolators/cgroups/subsystem.hpp | 11 +
.../isolators/cgroups/subsystems/memory.cpp | 571 +++++++++++++++++++
.../isolators/cgroups/subsystems/memory.hpp | 116 ++++
9 files changed, 746 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ff51705..7ffe377 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -170,6 +170,7 @@ set(LINUX_SRC
slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.cpp
slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.cpp
slave/containerizer/mesos/isolators/cgroups/subsystems/devices.cpp
+ slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
slave/containerizer/mesos/isolators/docker/runtime.cpp
slave/containerizer/mesos/isolators/docker/volume/driver.cpp
slave/containerizer/mesos/isolators/docker/volume/isolator.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 61c941f..83db882 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1054,6 +1054,7 @@ MESOS_LINUX_FILES = \
slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.cpp \
slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.cpp \
slave/containerizer/mesos/isolators/cgroups/subsystems/devices.cpp \
+ slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp \
slave/containerizer/mesos/isolators/docker/runtime.cpp \
slave/containerizer/mesos/isolators/docker/volume/isolator.cpp \
slave/containerizer/mesos/isolators/filesystem/linux.cpp \
@@ -1090,6 +1091,7 @@ MESOS_LINUX_FILES += \
slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.hpp \
slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.hpp \
slave/containerizer/mesos/isolators/cgroups/subsystems/devices.hpp \
+ slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp \
slave/containerizer/mesos/isolators/docker/runtime.hpp \
slave/containerizer/mesos/isolators/docker/volume/isolator.hpp \
slave/containerizer/mesos/isolators/filesystem/linux.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
index 4c0ec5c..ed9cff2 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
@@ -78,6 +78,7 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags)
{"cpu", CGROUP_SUBSYSTEM_CPU_NAME},
{"cpu", CGROUP_SUBSYSTEM_CPUACCT_NAME},
{"devices", CGROUP_SUBSYSTEM_DEVICES_NAME},
+ {"mem", CGROUP_SUBSYSTEM_MEMORY_NAME},
};
foreach (string isolator, strings::tokenize(flags.isolation, ",")) {
@@ -522,10 +523,39 @@ Future<ContainerLimitation> CgroupsIsolatorProcess::watch(
return Failure("Unknown container");
}
+ foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
+ subsystem->watch(containerId)
+ .onAny(defer(
+ PID<CgroupsIsolatorProcess>(this),
+ &CgroupsIsolatorProcess::_watch,
+ containerId,
+ lambda::_1));
+ }
+
return infos[containerId]->limitation.future();
}
+void CgroupsIsolatorProcess::_watch(
+ const ContainerID& containerId,
+ const Future<ContainerLimitation>& future)
+{
+ if (!infos.contains(containerId)) {
+ return;
+ }
+
+ CHECK(!future.isPending());
+
+ if (future.isReady()) {
+ infos[containerId]->limitation.set(future.get());
+ } else if (future.isFailed()) {
+ infos[containerId]->limitation.fail(future.failure());
+ } else {
+ infos[containerId]->limitation.discard();
+ }
+}
+
+
Future<Nothing> CgroupsIsolatorProcess::update(
const ContainerID& containerId,
const Resources& resources)
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
index 9b2d33e..38d1428 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
@@ -125,6 +125,10 @@ private:
process::Future<Nothing> _isolate(
const std::list<process::Future<Nothing>>& futures);
+ void _watch(
+ const ContainerID& containerId,
+ const process::Future<mesos::slave::ContainerLimitation>& future);
+
process::Future<Nothing> _update(
const std::list<process::Future<Nothing>>& futures);
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
index 4a7b556..58550ea 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
@@ -42,6 +42,7 @@ const Bytes MIN_MEMORY = Megabytes(32);
const std::string CGROUP_SUBSYSTEM_CPU_NAME = "cpu";
const std::string CGROUP_SUBSYSTEM_CPUACCT_NAME = "cpuacct";
const std::string CGROUP_SUBSYSTEM_DEVICES_NAME = "devices";
+const std::string CGROUP_SUBSYSTEM_MEMORY_NAME = "memory";
} // namespace slave {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
index df701d2..ba31b93 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
@@ -23,6 +23,9 @@
#include "slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/subsystems/devices.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp"
+
+using mesos::slave::ContainerLimitation;
using process::Future;
using process::Owned;
@@ -43,6 +46,7 @@ Try<Owned<Subsystem>> Subsystem::create(
{CGROUP_SUBSYSTEM_CPU_NAME, &CpuSubsystem::create},
{CGROUP_SUBSYSTEM_CPUACCT_NAME, &CpuacctSubsystem::create},
{CGROUP_SUBSYSTEM_DEVICES_NAME, &DevicesSubsystem::create},
+ {CGROUP_SUBSYSTEM_MEMORY_NAME, &MemorySubsystem::create},
};
if (!creators.contains(name)) {
@@ -85,6 +89,12 @@ Future<Nothing> Subsystem::isolate(const ContainerID& containerId, pid_t pid)
}
+Future<ContainerLimitation> Subsystem::watch(const ContainerID& containerId)
+{
+ return Future<ContainerLimitation>();
+}
+
+
Future<Nothing> Subsystem::update(
const ContainerID& containerId,
const Resources& resources)
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
index 18ba445..6dca38c 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
@@ -21,6 +21,8 @@
#include <mesos/resources.hpp>
+#include <mesos/slave/isolator.hpp>
+
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
@@ -91,6 +93,15 @@ public:
pid_t pid);
/**
+ * Watch the container and report if any resource constraint impacts it.
+ *
+ * @param containerId The target containerId.
+ * @return Nothing or an error if `recover` fails.
+ */
+ virtual process::Future<mesos::slave::ContainerLimitation> watch(
+ const ContainerID& containerId);
+
+ /**
* Update resources allocated to the associated container in this
* cgroups subsystem.
*
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
new file mode 100644
index 0000000..4c03aef
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
@@ -0,0 +1,571 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <sys/user.h>
+
+#include <climits>
+#include <sstream>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/id.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp"
+
+using cgroups::memory::pressure::Counter;
+using cgroups::memory::pressure::Level;
+
+using mesos::slave::ContainerLimitation;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::PID;
+
+using std::list;
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+static const vector<Level> levels()
+{
+ return {Level::LOW, Level::MEDIUM, Level::CRITICAL};
+}
+
+
+Try<Owned<Subsystem>> MemorySubsystem::create(
+ const Flags& flags,
+ const string& hierarchy)
+{
+ // Make sure the kernel OOM-killer is enabled.
+ // The Mesos OOM handler, as implemented, is not capable of handling
+ // the oom condition by itself safely given the limitations Linux
+ // imposes on this code path.
+ Try<Nothing> enable = cgroups::memory::oom::killer::enable(
+ hierarchy, flags.cgroups_root);
+
+ if (enable.isError()) {
+ return Error("Failed to enable kernel OOM killer: " + enable.error());
+ }
+
+ // Test if memory pressure listening is enabled. We test that on the
+ // root cgroup. We rely on `Counter::create` to test if memory
+ // pressure listening is enabled or not. The created counters will
+ // be destroyed immediately.
+ foreach (const Level& level, levels()) {
+ Try<Owned<Counter>> counter = Counter::create(
+ hierarchy,
+ flags.cgroups_root,
+ level);
+
+ if (counter.isError()) {
+ return Error(
+ "Failed to listen on '" + stringify(level) + "' "
+ "memory events: " + counter.error());
+ }
+ }
+
+ // Determine whether to limit swap or not.
+ if (flags.cgroups_limit_swap) {
+ Result<Bytes> check = cgroups::memory::memsw_limit_in_bytes(
+ hierarchy, flags.cgroups_root);
+
+ if (check.isError()) {
+ return Error(
+ "Failed to read 'memory.memsw.limit_in_bytes'"
+ ": " + check.error());
+ } else if (check.isNone()) {
+ return Error("'memory.memsw.limit_in_bytes' is not available");
+ }
+ }
+
+ return Owned<Subsystem>(new MemorySubsystem(flags, hierarchy));
+}
+
+
+MemorySubsystem::MemorySubsystem(
+ const Flags& _flags,
+ const string& _hierarchy)
+ : ProcessBase(process::ID::generate("cgroups-memory-subsystem")),
+ Subsystem(_flags, _hierarchy) {}
+
+
+Future<Nothing> MemorySubsystem::recover(const ContainerID& containerId)
+{
+ if (infos.contains(containerId)) {
+ return Failure("The subsystem '" + name() + "' has already been recovered");
+ }
+
+ infos.put(containerId, Owned<Info>(new Info));
+
+ oomListen(containerId);
+ pressureListen(containerId);
+
+ return Nothing();
+}
+
+
+Future<Nothing> MemorySubsystem::prepare(const ContainerID& containerId)
+{
+ if (infos.contains(containerId)) {
+ return Failure("The subsystem '" + name() + "' has already been prepared");
+ }
+
+ infos.put(containerId, Owned<Info>(new Info));
+
+ oomListen(containerId);
+ pressureListen(containerId);
+
+ return Nothing();
+}
+
+
+Future<ContainerLimitation> MemorySubsystem::watch(
+ const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ return Failure(
+ "Failed to watch subsystem '" + name() + "'"
+ ": Unknown container");
+ }
+
+ return infos[containerId]->limitation.future();
+}
+
+
+Future<Nothing> MemorySubsystem::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ if (!infos.contains(containerId)) {
+ return Failure(
+ "Failed to update subsystem '" + name() + "'"
+ ": Unknown container");
+ }
+
+ if (resources.mem().isNone()) {
+ return Failure(
+ "Failed to update subsystem '" + name() + "'"
+ ": No memory resource given");
+ }
+
+ // New limit.
+ Bytes mem = resources.mem().get();
+ Bytes limit = std::max(mem, MIN_MEMORY);
+
+ // Always set the soft limit.
+ Try<Nothing> write = cgroups::memory::soft_limit_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ limit);
+
+ if (write.isError()) {
+ return Failure(
+ "Failed to set 'memory.soft_limit_in_bytes'"
+ ": " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to "
+ << limit << " for container " << containerId;
+
+ // Read the existing limit.
+ Try<Bytes> currentLimit = cgroups::memory::limit_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ // NOTE: If `flags.cgroups_limit_swap` is (has been) used then both
+ // 'limit_in_bytes' and 'memsw.limit_in_bytes' will always be set to
+ // the same value.
+ if (currentLimit.isError()) {
+ return Failure(
+ "Failed to read 'memory.limit_in_bytes'"
+ ": " + currentLimit.error());
+ }
+
+ // Determine whether to set the hard limit. We only update the hard
+ // limit if this is the first time or when we're raising the
+ // existing limit, then we can update the hard limit safely.
+ // Otherwise, if we need to decrease 'memory.limit_in_bytes' we may
+ // induce an OOM if too much memory is in use. As a result, we only
+ // update the soft limit when the memory reservation is being
+ // reduced. This is probably okay if the machine has available
+ // resources.
+ //
+ // TODO(benh): Introduce a MemoryWatcherProcess which monitors the
+ // discrepancy between usage and soft limit and introduces a "manual
+ // oom" if necessary.
+ //
+ // If this is the first time, 'memory.limit_in_bytes' is the inital
+ // value which may be one of following possible values:
+ // * LONG_MAX (Linux Kernel Version < 3.12)
+ // * ULONG_MAX (3.12 <= Linux Kernel Version < 3.19)
+ // * LONG_MAX / PAGE_SIZE * PAGE_SIZE (Linux Kernel Version >= 3.19)
+ // Thus, if 'currentLimit' is greater or equals to 'initialLimit'
+ // below, we know it's the first time.
+ Bytes initialLimit(static_cast<uint64_t>(LONG_MAX / PAGE_SIZE * PAGE_SIZE));
+
+ if (currentLimit.get() >= initialLimit || limit > currentLimit.get()) {
+ // We always set limit_in_bytes first and optionally set
+ // memsw.limit_in_bytes if `cgroups_limit_swap` is true.
+ Try<Nothing> write = cgroups::memory::limit_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ limit);
+
+ if (write.isError()) {
+ return Failure(
+ "Failed to set 'memory.limit_in_bytes'"
+ ": " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit
+ << " for container " << containerId;
+
+ if (flags.cgroups_limit_swap) {
+ Try<bool> write = cgroups::memory::memsw_limit_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ limit);
+
+ if (write.isError()) {
+ return Failure(
+ "Failed to set 'memory.memsw.limit_in_bytes'"
+ ": " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'memory.memsw.limit_in_bytes' to " << limit
+ << " for container " << containerId;
+ }
+ }
+
+ return Nothing();
+}
+
+
+Future<ResourceStatistics> MemorySubsystem::usage(
+ const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ return Failure(
+ "Failed to get usage for subsystem '" + name() + "'"
+ ": Unknown container");
+ }
+
+ const Owned<Info>& info = infos[containerId];
+
+ ResourceStatistics result;
+
+ // The rss from memory.stat is wrong in two dimensions:
+ // 1. It does not include child cgroups.
+ // 2. It does not include any file backed pages.
+ Try<Bytes> usage = cgroups::memory::usage_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ if (usage.isError()) {
+ return Failure("Failed to parse 'memory.usage_in_bytes': " + usage.error());
+ }
+
+ result.set_mem_total_bytes(usage.get().bytes());
+
+ if (flags.cgroups_limit_swap) {
+ Try<Bytes> usage = cgroups::memory::memsw_usage_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ if (usage.isError()) {
+ return Failure(
+ "Failed to parse 'memory.memsw.usage_in_bytes': " + usage.error());
+ }
+
+ result.set_mem_total_memsw_bytes(usage.get().bytes());
+ }
+
+ // TODO(bmahler): Add namespacing to cgroups to enforce the expected
+ // structure, e.g, cgroups::memory::stat.
+ Try<hashmap<string, uint64_t>> stat = cgroups::stat(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ "memory.stat");
+
+ if (stat.isError()) {
+ return Failure("Failed to read 'memory.stat': " + stat.error());
+ }
+
+ Option<uint64_t> total_cache = stat.get().get("total_cache");
+ if (total_cache.isSome()) {
+ // TODO(chzhcn): mem_file_bytes is deprecated in 0.23.0 and will
+ // be removed in 0.24.0.
+ result.set_mem_file_bytes(total_cache.get());
+ result.set_mem_cache_bytes(total_cache.get());
+ }
+
+ Option<uint64_t> total_rss = stat.get().get("total_rss");
+ if (total_rss.isSome()) {
+ // TODO(chzhcn): mem_anon_bytes is deprecated in 0.23.0 and will
+ // be removed in 0.24.0.
+ result.set_mem_anon_bytes(total_rss.get());
+ result.set_mem_rss_bytes(total_rss.get());
+ }
+
+ Option<uint64_t> total_mapped_file = stat.get().get("total_mapped_file");
+ if (total_mapped_file.isSome()) {
+ result.set_mem_mapped_file_bytes(total_mapped_file.get());
+ }
+
+ Option<uint64_t> total_swap = stat.get().get("total_swap");
+ if (total_swap.isSome()) {
+ result.set_mem_swap_bytes(total_swap.get());
+ }
+
+ Option<uint64_t> total_unevictable = stat.get().get("total_unevictable");
+ if (total_unevictable.isSome()) {
+ result.set_mem_unevictable_bytes(total_unevictable.get());
+ }
+
+ // Get pressure counter readings.
+ list<Level> levels;
+ list<Future<uint64_t>> values;
+ foreachpair (Level level,
+ const Owned<Counter>& counter,
+ info->pressureCounters) {
+ levels.push_back(level);
+ values.push_back(counter->value());
+ }
+
+ return await(values)
+ .then(defer(PID<MemorySubsystem>(this),
+ &MemorySubsystem::_usage,
+ containerId,
+ result,
+ levels,
+ lambda::_1));
+}
+
+
+Future<ResourceStatistics> MemorySubsystem::_usage(
+ const ContainerID& containerId,
+ ResourceStatistics result,
+ const list<Level>& levels,
+ const list<Future<uint64_t>>& values)
+{
+ if (!infos.contains(containerId)) {
+ return Failure(
+ "Failed to get usage for subsystem '" + name() + "'"
+ ": Unknown container");
+ }
+
+ list<Level>::const_iterator iterator = levels.begin();
+ foreach (const Future<uint64_t>& value, values) {
+ if (value.isReady()) {
+ switch (*iterator) {
+ case Level::LOW:
+ result.set_mem_low_pressure_counter(value.get());
+ break;
+ case Level::MEDIUM:
+ result.set_mem_medium_pressure_counter(value.get());
+ break;
+ case Level::CRITICAL:
+ result.set_mem_critical_pressure_counter(value.get());
+ break;
+ }
+ } else {
+ LOG(ERROR) << "Failed to listen on '" << stringify(*iterator)
+ << "' pressure events for container " << containerId << ": "
+ << (value.isFailed() ? value.failure() : "discarded");
+ }
+
+ ++iterator;
+ }
+
+ return result;
+}
+
+
+Future<Nothing> MemorySubsystem::cleanup(const ContainerID& containerId)
+{
+ if (!infos.contains(containerId)) {
+ VLOG(1) << "Ignoring cleanup subsystem '" << name() << "' "
+ << "request for unknown container " << containerId;
+
+ return Nothing();
+ }
+
+ if (infos[containerId]->oomNotifier.isPending()) {
+ infos[containerId]->oomNotifier.discard();
+ }
+
+ infos.erase(containerId);
+
+ return Nothing();
+}
+
+
+void MemorySubsystem::oomListen(const ContainerID& containerId)
+{
+ CHECK(infos.contains(containerId));
+
+ const Owned<Info>& info = infos[containerId];
+
+ info->oomNotifier = cgroups::memory::oom::listen(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ // If the listening fails immediately, something very wrong
+ // happened. Therefore, we report a fatal error here.
+ if (info->oomNotifier.isFailed()) {
+ LOG(FATAL) << "Failed to listen for OOM events for container "
+ << containerId << ": "
+ << info->oomNotifier.failure();
+ }
+
+ LOG(INFO) << "Started listening for OOM events for container "
+ << containerId;
+
+ info->oomNotifier.onReady(
+ defer(PID<MemorySubsystem>(this),
+ &MemorySubsystem::oomWaited,
+ containerId,
+ lambda::_1));
+}
+
+
+void MemorySubsystem::oomWaited(
+ const ContainerID& containerId,
+ const Future<Nothing>& future)
+{
+ if (future.isDiscarded()) {
+ LOG(INFO) << "Discarded OOM notifier for container " << containerId;
+ return;
+ }
+
+ if (future.isFailed()) {
+ LOG(ERROR) << "Listening on OOM events failed for container "
+ << containerId << ": " << future.failure();
+ return;
+ }
+
+ if (!infos.contains(containerId)) {
+ // It is likely that process exited is executed before this
+ // function (e.g. The kill and OOM events happen at the same time,
+ // and the process exit event arrives first). Therefore, we should
+ // not report a fatal error here.
+ LOG(INFO) << "OOM detected for the terminated container " << containerId;
+ return;
+ }
+
+ LOG(INFO) << "OOM detected for container " << containerId;
+
+ // Construct a "message" string to describe why the isolator
+ // destroyed the container's cgroup (in order to assist debugging).
+ ostringstream message;
+ message << "Memory limit exceeded: ";
+
+ // Output the requested memory limit.
+ // NOTE: If 'flags.cgroups_limit_swap' is (has been) used, then both
+ // 'limit_in_bytes' and 'memsw.limit_in_bytes' will always be set to
+ // the same value.
+ Try<Bytes> limit = cgroups::memory::limit_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ if (limit.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.limit_in_bytes': " << limit.error();
+ } else {
+ message << "Requested: " << limit.get() << " ";
+ }
+
+ // Output the maximum memory usage.
+ Try<Bytes> usage = cgroups::memory::max_usage_in_bytes(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()));
+
+ if (usage.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.max_usage_in_bytes': "
+ << usage.error();
+ } else {
+ message << "Maximum Used: " << usage.get() << "\n";
+ }
+
+ // Output 'memory.stat' of the cgroup to help with debugging.
+ // NOTE: With kernel OOM-killer enabled these stats may not reflect
+ // memory state at time of OOM.
+ Try<string> read = cgroups::read(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ "memory.stat");
+
+ if (read.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.stat': " << read.error();
+ } else {
+ message << "\nMEMORY STATISTICS: \n" << read.get() << "\n";
+ }
+
+ LOG(INFO) << strings::trim(message.str()); // Trim the extra '\n' at the end.
+
+ // TODO(jieyu): This is not accurate if the memory resource is from
+ // a non-star role or spans roles (e.g., "*" and "role"). Ideally,
+ // we should save the resources passed in and report it here.
+ Resources mem = Resources::parse(
+ "mem",
+ stringify(usage.isSome() ? usage.get().megabytes() : 0),
+ "*").get();
+
+ infos[containerId]->limitation.set(
+ protobuf::slave::createContainerLimitation(
+ mem,
+ message.str(),
+ TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY));
+}
+
+
+void MemorySubsystem::pressureListen(const ContainerID& containerId)
+{
+ CHECK(infos.contains(containerId));
+
+ foreach (const Level& level, levels()) {
+ Try<Owned<Counter>> counter = Counter::create(
+ hierarchy,
+ path::join(flags.cgroups_root, containerId.value()),
+ level);
+
+ if (counter.isError()) {
+ LOG(ERROR) << "Failed to listen on '" << level << "' memory pressure "
+ << "events for container " << containerId << ": "
+ << counter.error();
+ } else {
+ infos[containerId]->pressureCounters[level] = counter.get();
+
+ LOG(INFO) << "Started listening on '" << level << "' memory pressure "
+ << "events for container " << containerId;
+ }
+ }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
new file mode 100644
index 0000000..a1c87ce
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__
+#define __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__
+
+#include <list>
+#include <string>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "slave/flags.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/subsystem.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+/**
+ * Represent cgroups memory subsystem.
+ */
+class MemorySubsystem : public Subsystem
+{
+public:
+ static Try<process::Owned<Subsystem>> create(
+ const Flags& flags,
+ const std::string& hierarchy);
+
+ virtual ~MemorySubsystem() {}
+
+ virtual std::string name() const
+ {
+ return CGROUP_SUBSYSTEM_MEMORY_NAME;
+ }
+
+ virtual process::Future<Nothing> prepare(const ContainerID& containerId);
+
+ virtual process::Future<Nothing> recover(const ContainerID& containerId);
+
+ virtual process::Future<mesos::slave::ContainerLimitation> watch(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<Nothing> cleanup(const ContainerID& containerId);
+
+private:
+ struct Info
+ {
+ // Used to cancel the OOM listening.
+ process::Future<Nothing> oomNotifier;
+
+ hashmap<
+ cgroups::memory::pressure::Level,
+ process::Owned<cgroups::memory::pressure::Counter>> pressureCounters;
+
+ process::Promise<mesos::slave::ContainerLimitation> limitation;
+ };
+
+ MemorySubsystem(const Flags& flags, const std::string& hierarchy);
+
+ process::Future<ResourceStatistics> _usage(
+ const ContainerID& containerId,
+ ResourceStatistics result,
+ const std::list<cgroups::memory::pressure::Level>& levels,
+ const std::list<process::Future<uint64_t>>& values);
+
+ // Start listening on OOM events. This function will create an
+ // eventfd and start polling on it.
+ void oomListen(const ContainerID& containerId);
+
+ // This function is invoked when the polling on eventfd has a
+ // result.
+ void oomWaited(
+ const ContainerID& containerId,
+ const process::Future<Nothing>& future);
+
+ // Start listening on memory pressure events.
+ void pressureListen(const ContainerID& containerId);
+
+ // Stores cgroups associated information for container.
+ hashmap<ContainerID, process::Owned<Info>> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__