You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/08/22 03:40:59 UTC

mesos git commit: Implemented `MemorySubsystem`.

Repository: mesos
Updated Branches:
  refs/heads/master a43924a2d -> 178c1fa22


Implemented `MemorySubsystem`.

Review: https://reviews.apache.org/r/49851/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/178c1fa2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/178c1fa2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/178c1fa2

Branch: refs/heads/master
Commit: 178c1fa227b928c5a533d28f04ea0b6f924606aa
Parents: a43924a
Author: haosdent huang <ha...@gmail.com>
Authored: Sun Aug 21 10:58:05 2016 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Aug 21 20:40:50 2016 -0700

----------------------------------------------------------------------
 src/CMakeLists.txt                              |   1 +
 src/Makefile.am                                 |   2 +
 .../mesos/isolators/cgroups/cgroups.cpp         |  30 +
 .../mesos/isolators/cgroups/cgroups.hpp         |   4 +
 .../mesos/isolators/cgroups/constants.hpp       |   1 +
 .../mesos/isolators/cgroups/subsystem.cpp       |  10 +
 .../mesos/isolators/cgroups/subsystem.hpp       |  11 +
 .../isolators/cgroups/subsystems/memory.cpp     | 571 +++++++++++++++++++
 .../isolators/cgroups/subsystems/memory.hpp     | 116 ++++
 9 files changed, 746 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ff51705..7ffe377 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -170,6 +170,7 @@ set(LINUX_SRC
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.cpp
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.cpp
   slave/containerizer/mesos/isolators/cgroups/subsystems/devices.cpp
+  slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
   slave/containerizer/mesos/isolators/docker/runtime.cpp
   slave/containerizer/mesos/isolators/docker/volume/driver.cpp
   slave/containerizer/mesos/isolators/docker/volume/isolator.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 61c941f..83db882 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1054,6 +1054,7 @@ MESOS_LINUX_FILES =							\
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.cpp	\
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.cpp	\
   slave/containerizer/mesos/isolators/cgroups/subsystems/devices.cpp	\
+  slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp	\
   slave/containerizer/mesos/isolators/docker/runtime.cpp		\
   slave/containerizer/mesos/isolators/docker/volume/isolator.cpp	\
   slave/containerizer/mesos/isolators/filesystem/linux.cpp		\
@@ -1090,6 +1091,7 @@ MESOS_LINUX_FILES +=							\
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.hpp	\
   slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.hpp	\
   slave/containerizer/mesos/isolators/cgroups/subsystems/devices.hpp	\
+  slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp	\
   slave/containerizer/mesos/isolators/docker/runtime.hpp		\
   slave/containerizer/mesos/isolators/docker/volume/isolator.hpp	\
   slave/containerizer/mesos/isolators/filesystem/linux.hpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
index 4c0ec5c..ed9cff2 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
@@ -78,6 +78,7 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags)
     {"cpu", CGROUP_SUBSYSTEM_CPU_NAME},
     {"cpu", CGROUP_SUBSYSTEM_CPUACCT_NAME},
     {"devices", CGROUP_SUBSYSTEM_DEVICES_NAME},
+    {"mem", CGROUP_SUBSYSTEM_MEMORY_NAME},
   };
 
   foreach (string isolator, strings::tokenize(flags.isolation, ",")) {
@@ -522,10 +523,39 @@ Future<ContainerLimitation> CgroupsIsolatorProcess::watch(
     return Failure("Unknown container");
   }
 
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
+    subsystem->watch(containerId)
+      .onAny(defer(
+          PID<CgroupsIsolatorProcess>(this),
+          &CgroupsIsolatorProcess::_watch,
+          containerId,
+          lambda::_1));
+  }
+
   return infos[containerId]->limitation.future();
 }
 
 
+void CgroupsIsolatorProcess::_watch(
+    const ContainerID& containerId,
+    const Future<ContainerLimitation>& future)
+{
+  if (!infos.contains(containerId)) {
+    return;
+  }
+
+  CHECK(!future.isPending());
+
+  if (future.isReady()) {
+    infos[containerId]->limitation.set(future.get());
+  } else if (future.isFailed()) {
+    infos[containerId]->limitation.fail(future.failure());
+  } else {
+    infos[containerId]->limitation.discard();
+  }
+}
+
+
 Future<Nothing> CgroupsIsolatorProcess::update(
     const ContainerID& containerId,
     const Resources& resources)

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
index 9b2d33e..38d1428 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
@@ -125,6 +125,10 @@ private:
   process::Future<Nothing> _isolate(
       const std::list<process::Future<Nothing>>& futures);
 
+  void _watch(
+      const ContainerID& containerId,
+      const process::Future<mesos::slave::ContainerLimitation>& future);
+
   process::Future<Nothing> _update(
       const std::list<process::Future<Nothing>>& futures);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
index 4a7b556..58550ea 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/constants.hpp
@@ -42,6 +42,7 @@ const Bytes MIN_MEMORY = Megabytes(32);
 const std::string CGROUP_SUBSYSTEM_CPU_NAME = "cpu";
 const std::string CGROUP_SUBSYSTEM_CPUACCT_NAME = "cpuacct";
 const std::string CGROUP_SUBSYSTEM_DEVICES_NAME = "devices";
+const std::string CGROUP_SUBSYSTEM_MEMORY_NAME = "memory";
 
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
index df701d2..ba31b93 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
@@ -23,6 +23,9 @@
 #include "slave/containerizer/mesos/isolators/cgroups/subsystems/cpu.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups/subsystems/cpuacct.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups/subsystems/devices.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp"
+
+using mesos::slave::ContainerLimitation;
 
 using process::Future;
 using process::Owned;
@@ -43,6 +46,7 @@ Try<Owned<Subsystem>> Subsystem::create(
     {CGROUP_SUBSYSTEM_CPU_NAME, &CpuSubsystem::create},
     {CGROUP_SUBSYSTEM_CPUACCT_NAME, &CpuacctSubsystem::create},
     {CGROUP_SUBSYSTEM_DEVICES_NAME, &DevicesSubsystem::create},
+    {CGROUP_SUBSYSTEM_MEMORY_NAME, &MemorySubsystem::create},
   };
 
   if (!creators.contains(name)) {
@@ -85,6 +89,12 @@ Future<Nothing> Subsystem::isolate(const ContainerID& containerId, pid_t pid)
 }
 
 
+Future<ContainerLimitation> Subsystem::watch(const ContainerID& containerId)
+{
+  return Future<ContainerLimitation>();
+}
+
+
 Future<Nothing> Subsystem::update(
     const ContainerID& containerId,
     const Resources& resources)

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
index 18ba445..6dca38c 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
@@ -21,6 +21,8 @@
 
 #include <mesos/resources.hpp>
 
+#include <mesos/slave/isolator.hpp>
+
 #include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -91,6 +93,15 @@ public:
       pid_t pid);
 
   /**
+   * Watch the container and report if any resource constraint impacts it.
+   *
+   * @param containerId The target containerId.
+   * @return Nothing or an error if `recover` fails.
+   */
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId);
+
+  /**
    * Update resources allocated to the associated container in this
    * cgroups subsystem.
    *

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
new file mode 100644
index 0000000..4c03aef
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.cpp
@@ -0,0 +1,571 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <sys/user.h>
+
+#include <climits>
+#include <sstream>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/id.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp"
+
+using cgroups::memory::pressure::Counter;
+using cgroups::memory::pressure::Level;
+
+using mesos::slave::ContainerLimitation;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::PID;
+
+using std::list;
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+static const vector<Level> levels()
+{
+  return {Level::LOW, Level::MEDIUM, Level::CRITICAL};
+}
+
+
+Try<Owned<Subsystem>> MemorySubsystem::create(
+    const Flags& flags,
+    const string& hierarchy)
+{
+  // Make sure the kernel OOM-killer is enabled.
+  // The Mesos OOM handler, as implemented, is not capable of handling
+  // the oom condition by itself safely given the limitations Linux
+  // imposes on this code path.
+  Try<Nothing> enable = cgroups::memory::oom::killer::enable(
+      hierarchy, flags.cgroups_root);
+
+  if (enable.isError()) {
+    return Error("Failed to enable kernel OOM killer: " + enable.error());
+  }
+
+  // Test if memory pressure listening is enabled. We test that on the
+  // root cgroup. We rely on `Counter::create` to test if memory
+  // pressure listening is enabled or not. The created counters will
+  // be destroyed immediately.
+  foreach (const Level& level, levels()) {
+    Try<Owned<Counter>> counter = Counter::create(
+        hierarchy,
+        flags.cgroups_root,
+        level);
+
+    if (counter.isError()) {
+      return Error(
+          "Failed to listen on '" + stringify(level) + "' "
+          "memory events: " + counter.error());
+    }
+  }
+
+  // Determine whether to limit swap or not.
+  if (flags.cgroups_limit_swap) {
+    Result<Bytes> check = cgroups::memory::memsw_limit_in_bytes(
+        hierarchy, flags.cgroups_root);
+
+    if (check.isError()) {
+      return Error(
+          "Failed to read 'memory.memsw.limit_in_bytes'"
+          ": " + check.error());
+    } else if (check.isNone()) {
+      return Error("'memory.memsw.limit_in_bytes' is not available");
+    }
+  }
+
+  return Owned<Subsystem>(new MemorySubsystem(flags, hierarchy));
+}
+
+
+MemorySubsystem::MemorySubsystem(
+    const Flags& _flags,
+    const string& _hierarchy)
+  : ProcessBase(process::ID::generate("cgroups-memory-subsystem")),
+    Subsystem(_flags, _hierarchy) {}
+
+
+Future<Nothing> MemorySubsystem::recover(const ContainerID& containerId)
+{
+  if (infos.contains(containerId)) {
+    return Failure("The subsystem '" + name() + "' has already been recovered");
+  }
+
+  infos.put(containerId, Owned<Info>(new Info));
+
+  oomListen(containerId);
+  pressureListen(containerId);
+
+  return Nothing();
+}
+
+
+Future<Nothing> MemorySubsystem::prepare(const ContainerID& containerId)
+{
+  if (infos.contains(containerId)) {
+    return Failure("The subsystem '" + name() + "' has already been prepared");
+  }
+
+  infos.put(containerId, Owned<Info>(new Info));
+
+  oomListen(containerId);
+  pressureListen(containerId);
+
+  return Nothing();
+}
+
+
+Future<ContainerLimitation> MemorySubsystem::watch(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure(
+        "Failed to watch subsystem '" + name() + "'"
+        ": Unknown container");
+  }
+
+  return infos[containerId]->limitation.future();
+}
+
+
+Future<Nothing> MemorySubsystem::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  if (!infos.contains(containerId)) {
+    return Failure(
+        "Failed to update subsystem '" + name() + "'"
+        ": Unknown container");
+  }
+
+  if (resources.mem().isNone()) {
+    return Failure(
+        "Failed to update subsystem '" + name() + "'"
+        ": No memory resource given");
+  }
+
+  // New limit.
+  Bytes mem = resources.mem().get();
+  Bytes limit = std::max(mem, MIN_MEMORY);
+
+  // Always set the soft limit.
+  Try<Nothing> write = cgroups::memory::soft_limit_in_bytes(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()),
+      limit);
+
+  if (write.isError()) {
+    return Failure(
+        "Failed to set 'memory.soft_limit_in_bytes'"
+        ": " + write.error());
+  }
+
+  LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to "
+            << limit << " for container " << containerId;
+
+  // Read the existing limit.
+  Try<Bytes> currentLimit = cgroups::memory::limit_in_bytes(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()));
+
+  // NOTE: If `flags.cgroups_limit_swap` is (has been) used then both
+  // 'limit_in_bytes' and 'memsw.limit_in_bytes' will always be set to
+  // the same value.
+  if (currentLimit.isError()) {
+    return Failure(
+        "Failed to read 'memory.limit_in_bytes'"
+        ": " + currentLimit.error());
+  }
+
+  // Determine whether to set the hard limit. We only update the hard
+  // limit if this is the first time or when we're raising the
+  // existing limit, then we can update the hard limit safely.
+  // Otherwise, if we need to decrease 'memory.limit_in_bytes' we may
+  // induce an OOM if too much memory is in use.  As a result, we only
+  // update the soft limit when the memory reservation is being
+  // reduced. This is probably okay if the machine has available
+  // resources.
+  //
+  // TODO(benh): Introduce a MemoryWatcherProcess which monitors the
+  // discrepancy between usage and soft limit and introduces a "manual
+  // oom" if necessary.
+  //
+  // If this is the first time, 'memory.limit_in_bytes' is the inital
+  // value which may be one of following possible values:
+  //   * LONG_MAX (Linux Kernel Version < 3.12)
+  //   * ULONG_MAX (3.12 <= Linux Kernel Version < 3.19)
+  //   * LONG_MAX / PAGE_SIZE * PAGE_SIZE (Linux Kernel Version >= 3.19)
+  // Thus, if 'currentLimit' is greater or equals to 'initialLimit'
+  // below, we know it's the first time.
+  Bytes initialLimit(static_cast<uint64_t>(LONG_MAX / PAGE_SIZE * PAGE_SIZE));
+
+  if (currentLimit.get() >= initialLimit || limit > currentLimit.get()) {
+    // We always set limit_in_bytes first and optionally set
+    // memsw.limit_in_bytes if `cgroups_limit_swap` is true.
+    Try<Nothing> write = cgroups::memory::limit_in_bytes(
+        hierarchy,
+        path::join(flags.cgroups_root, containerId.value()),
+        limit);
+
+    if (write.isError()) {
+      return Failure(
+          "Failed to set 'memory.limit_in_bytes'"
+          ": " + write.error());
+    }
+
+    LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit
+              << " for container " << containerId;
+
+    if (flags.cgroups_limit_swap) {
+      Try<bool> write = cgroups::memory::memsw_limit_in_bytes(
+          hierarchy,
+          path::join(flags.cgroups_root, containerId.value()),
+          limit);
+
+      if (write.isError()) {
+        return Failure(
+            "Failed to set 'memory.memsw.limit_in_bytes'"
+            ": " + write.error());
+      }
+
+      LOG(INFO) << "Updated 'memory.memsw.limit_in_bytes' to " << limit
+                << " for container " << containerId;
+    }
+  }
+
+  return Nothing();
+}
+
+
+Future<ResourceStatistics> MemorySubsystem::usage(
+    const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    return Failure(
+        "Failed to get usage for subsystem '" + name() + "'"
+        ": Unknown container");
+  }
+
+  const Owned<Info>& info = infos[containerId];
+
+  ResourceStatistics result;
+
+  // The rss from memory.stat is wrong in two dimensions:
+  //   1. It does not include child cgroups.
+  //   2. It does not include any file backed pages.
+  Try<Bytes> usage = cgroups::memory::usage_in_bytes(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()));
+
+  if (usage.isError()) {
+    return Failure("Failed to parse 'memory.usage_in_bytes': " + usage.error());
+  }
+
+  result.set_mem_total_bytes(usage.get().bytes());
+
+  if (flags.cgroups_limit_swap) {
+    Try<Bytes> usage = cgroups::memory::memsw_usage_in_bytes(
+        hierarchy,
+        path::join(flags.cgroups_root, containerId.value()));
+
+    if (usage.isError()) {
+      return Failure(
+        "Failed to parse 'memory.memsw.usage_in_bytes': " + usage.error());
+    }
+
+    result.set_mem_total_memsw_bytes(usage.get().bytes());
+  }
+
+  // TODO(bmahler): Add namespacing to cgroups to enforce the expected
+  // structure, e.g, cgroups::memory::stat.
+  Try<hashmap<string, uint64_t>> stat = cgroups::stat(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()),
+      "memory.stat");
+
+  if (stat.isError()) {
+    return Failure("Failed to read 'memory.stat': " + stat.error());
+  }
+
+  Option<uint64_t> total_cache = stat.get().get("total_cache");
+  if (total_cache.isSome()) {
+    // TODO(chzhcn): mem_file_bytes is deprecated in 0.23.0 and will
+    // be removed in 0.24.0.
+    result.set_mem_file_bytes(total_cache.get());
+    result.set_mem_cache_bytes(total_cache.get());
+  }
+
+  Option<uint64_t> total_rss = stat.get().get("total_rss");
+  if (total_rss.isSome()) {
+    // TODO(chzhcn): mem_anon_bytes is deprecated in 0.23.0 and will
+    // be removed in 0.24.0.
+    result.set_mem_anon_bytes(total_rss.get());
+    result.set_mem_rss_bytes(total_rss.get());
+  }
+
+  Option<uint64_t> total_mapped_file = stat.get().get("total_mapped_file");
+  if (total_mapped_file.isSome()) {
+    result.set_mem_mapped_file_bytes(total_mapped_file.get());
+  }
+
+  Option<uint64_t> total_swap = stat.get().get("total_swap");
+  if (total_swap.isSome()) {
+    result.set_mem_swap_bytes(total_swap.get());
+  }
+
+  Option<uint64_t> total_unevictable = stat.get().get("total_unevictable");
+  if (total_unevictable.isSome()) {
+    result.set_mem_unevictable_bytes(total_unevictable.get());
+  }
+
+  // Get pressure counter readings.
+  list<Level> levels;
+  list<Future<uint64_t>> values;
+  foreachpair (Level level,
+               const Owned<Counter>& counter,
+               info->pressureCounters) {
+    levels.push_back(level);
+    values.push_back(counter->value());
+  }
+
+  return await(values)
+    .then(defer(PID<MemorySubsystem>(this),
+                &MemorySubsystem::_usage,
+                containerId,
+                result,
+                levels,
+                lambda::_1));
+}
+
+
+Future<ResourceStatistics> MemorySubsystem::_usage(
+    const ContainerID& containerId,
+    ResourceStatistics result,
+    const list<Level>& levels,
+    const list<Future<uint64_t>>& values)
+{
+  if (!infos.contains(containerId)) {
+    return Failure(
+        "Failed to get usage for subsystem '" + name() + "'"
+        ": Unknown container");
+  }
+
+  list<Level>::const_iterator iterator = levels.begin();
+  foreach (const Future<uint64_t>& value, values) {
+    if (value.isReady()) {
+      switch (*iterator) {
+        case Level::LOW:
+          result.set_mem_low_pressure_counter(value.get());
+          break;
+        case Level::MEDIUM:
+          result.set_mem_medium_pressure_counter(value.get());
+          break;
+        case Level::CRITICAL:
+          result.set_mem_critical_pressure_counter(value.get());
+          break;
+      }
+    } else {
+      LOG(ERROR) << "Failed to listen on '" << stringify(*iterator)
+                 << "' pressure events for container " << containerId << ": "
+                 << (value.isFailed() ? value.failure() : "discarded");
+    }
+
+    ++iterator;
+  }
+
+  return result;
+}
+
+
+Future<Nothing> MemorySubsystem::cleanup(const ContainerID& containerId)
+{
+  if (!infos.contains(containerId)) {
+    VLOG(1) << "Ignoring cleanup subsystem '" << name() << "' "
+            << "request for unknown container " << containerId;
+
+    return Nothing();
+  }
+
+  if (infos[containerId]->oomNotifier.isPending()) {
+    infos[containerId]->oomNotifier.discard();
+  }
+
+  infos.erase(containerId);
+
+  return Nothing();
+}
+
+
+void MemorySubsystem::oomListen(const ContainerID& containerId)
+{
+  CHECK(infos.contains(containerId));
+
+  const Owned<Info>& info = infos[containerId];
+
+  info->oomNotifier = cgroups::memory::oom::listen(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()));
+
+  // If the listening fails immediately, something very wrong
+  // happened. Therefore, we report a fatal error here.
+  if (info->oomNotifier.isFailed()) {
+    LOG(FATAL) << "Failed to listen for OOM events for container "
+               << containerId << ": "
+               << info->oomNotifier.failure();
+  }
+
+  LOG(INFO) << "Started listening for OOM events for container "
+            << containerId;
+
+  info->oomNotifier.onReady(
+      defer(PID<MemorySubsystem>(this),
+            &MemorySubsystem::oomWaited,
+            containerId,
+            lambda::_1));
+}
+
+
+void MemorySubsystem::oomWaited(
+    const ContainerID& containerId,
+    const Future<Nothing>& future)
+{
+  if (future.isDiscarded()) {
+    LOG(INFO) << "Discarded OOM notifier for container " << containerId;
+    return;
+  }
+
+  if (future.isFailed()) {
+    LOG(ERROR) << "Listening on OOM events failed for container "
+               << containerId << ": " << future.failure();
+    return;
+  }
+
+  if (!infos.contains(containerId)) {
+    // It is likely that process exited is executed before this
+    // function (e.g. The kill and OOM events happen at the same time,
+    // and the process exit event arrives first). Therefore, we should
+    // not report a fatal error here.
+    LOG(INFO) << "OOM detected for the terminated container " << containerId;
+    return;
+  }
+
+  LOG(INFO) << "OOM detected for container " << containerId;
+
+  // Construct a "message" string to describe why the isolator
+  // destroyed the container's cgroup (in order to assist debugging).
+  ostringstream message;
+  message << "Memory limit exceeded: ";
+
+  // Output the requested memory limit.
+  // NOTE: If 'flags.cgroups_limit_swap' is (has been) used, then both
+  // 'limit_in_bytes' and 'memsw.limit_in_bytes' will always be set to
+  // the same value.
+  Try<Bytes> limit = cgroups::memory::limit_in_bytes(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()));
+
+  if (limit.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.limit_in_bytes': " << limit.error();
+  } else {
+    message << "Requested: " << limit.get() << " ";
+  }
+
+  // Output the maximum memory usage.
+  Try<Bytes> usage = cgroups::memory::max_usage_in_bytes(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()));
+
+  if (usage.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.max_usage_in_bytes': "
+               << usage.error();
+  } else {
+    message << "Maximum Used: " << usage.get() << "\n";
+  }
+
+  // Output 'memory.stat' of the cgroup to help with debugging.
+  // NOTE: With kernel OOM-killer enabled these stats may not reflect
+  // memory state at time of OOM.
+  Try<string> read = cgroups::read(
+      hierarchy,
+      path::join(flags.cgroups_root, containerId.value()),
+      "memory.stat");
+
+  if (read.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.stat': " << read.error();
+  } else {
+    message << "\nMEMORY STATISTICS: \n" << read.get() << "\n";
+  }
+
+  LOG(INFO) << strings::trim(message.str()); // Trim the extra '\n' at the end.
+
+  // TODO(jieyu): This is not accurate if the memory resource is from
+  // a non-star role or spans roles (e.g., "*" and "role"). Ideally,
+  // we should save the resources passed in and report it here.
+  Resources mem = Resources::parse(
+      "mem",
+      stringify(usage.isSome() ? usage.get().megabytes() : 0),
+      "*").get();
+
+  infos[containerId]->limitation.set(
+      protobuf::slave::createContainerLimitation(
+          mem,
+          message.str(),
+          TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY));
+}
+
+
+void MemorySubsystem::pressureListen(const ContainerID& containerId)
+{
+  CHECK(infos.contains(containerId));
+
+  foreach (const Level& level, levels()) {
+    Try<Owned<Counter>> counter = Counter::create(
+        hierarchy,
+        path::join(flags.cgroups_root, containerId.value()),
+        level);
+
+    if (counter.isError()) {
+      LOG(ERROR) << "Failed to listen on '" << level << "' memory pressure "
+                 << "events for container " << containerId << ": "
+                 << counter.error();
+    } else {
+      infos[containerId]->pressureCounters[level] = counter.get();
+
+      LOG(INFO) << "Started listening on '" << level << "' memory pressure "
+                << "events for container " << containerId;
+    }
+  }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/178c1fa2/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
new file mode 100644
index 0000000..a1c87ce
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/memory.hpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__
+#define __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__
+
+#include <list>
+#include <string>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "slave/flags.hpp"
+
+#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
+#include "slave/containerizer/mesos/isolators/cgroups/subsystem.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+/**
+ * Represent cgroups memory subsystem.
+ */
+class MemorySubsystem : public Subsystem
+{
+public:
+  static Try<process::Owned<Subsystem>> create(
+      const Flags& flags,
+      const std::string& hierarchy);
+
+  virtual ~MemorySubsystem() {}
+
+  virtual std::string name() const
+  {
+    return CGROUP_SUBSYSTEM_MEMORY_NAME;
+  }
+
+  virtual process::Future<Nothing> prepare(const ContainerID& containerId);
+
+  virtual process::Future<Nothing> recover(const ContainerID& containerId);
+
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<Nothing> cleanup(const ContainerID& containerId);
+
+private:
+  struct Info
+  {
+    // Used to cancel the OOM listening.
+    process::Future<Nothing> oomNotifier;
+
+    hashmap<
+        cgroups::memory::pressure::Level,
+        process::Owned<cgroups::memory::pressure::Counter>> pressureCounters;
+
+    process::Promise<mesos::slave::ContainerLimitation> limitation;
+  };
+
+  MemorySubsystem(const Flags& flags, const std::string& hierarchy);
+
+  process::Future<ResourceStatistics> _usage(
+      const ContainerID& containerId,
+      ResourceStatistics result,
+      const std::list<cgroups::memory::pressure::Level>& levels,
+      const std::list<process::Future<uint64_t>>& values);
+
+  // Start listening on OOM events. This function will create an
+  // eventfd and start polling on it.
+  void oomListen(const ContainerID& containerId);
+
+  // This function is invoked when the polling on eventfd has a
+  // result.
+  void oomWaited(
+      const ContainerID& containerId,
+      const process::Future<Nothing>& future);
+
+  // Start listening on memory pressure events.
+  void pressureListen(const ContainerID& containerId);
+
+  // Stores cgroups associated information for container.
+  hashmap<ContainerID, process::Owned<Info>> infos;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __CGROUPS_ISOLATOR_SUBSYSTEMS_MEMORY_HPP__