You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/27 20:58:19 UTC

[1/2] mesos git commit: Added support for listening on cgroups memory pressures.

Repository: mesos
Updated Branches:
  refs/heads/master 04f8302c0 -> dfb2794d8


Added support for listening on cgroups memory pressures.

Review: https://reviews.apache.org/r/30545


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc5826de
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc5826de
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc5826de

Branch: refs/heads/master
Commit: cc5826defbebb46bc66e8bbc8900d66b8f27893c
Parents: 04f8302
Author: Chi Zhang <ch...@gmail.com>
Authored: Fri Mar 27 10:04:23 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Mar 27 10:18:44 2015 -0700

----------------------------------------------------------------------
 src/linux/cgroups.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++--
 src/linux/cgroups.hpp |  51 +++++++++++++++++
 2 files changed, 181 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index a533b31..df3211a 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -55,6 +55,7 @@
 #include <stout/proc.hpp>
 #include <stout/stringify.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 
 #include "linux/cgroups.hpp"
 #include "linux/fs.hpp"
@@ -71,6 +72,7 @@ using std::istringstream;
 using std::list;
 using std::map;
 using std::ofstream;
+using std::ostream;
 using std::ostringstream;
 using std::set;
 using std::string;
@@ -2149,11 +2151,8 @@ Try<Bytes> max_usage_in_bytes(const string& hierarchy, const string& cgroup)
 
 namespace oom {
 
-namespace {
+static Nothing _nothing() { return Nothing(); }
 
-Nothing _nothing() { return Nothing(); }
-
-} // namespace {
 
 Future<Nothing> listen(const string& hierarchy, const string& cgroup)
 {
@@ -2239,6 +2238,133 @@ Try<Nothing> disable(const string& hierarchy, const string& cgroup)
 
 } // namespace oom {
 
+
+namespace pressure {
+
+ostream& operator << (ostream& stream, Level level)
+{
+  switch (level) {
+    case LOW:
+      stream << "low";
+      break;
+    case MEDIUM:
+      stream << "medium";
+      break;
+    case CRITICAL:
+      stream << "critical";
+      break;
+    default:
+      UNREACHABLE();
+  }
+
+  return stream;
+}
+
+
+// The process drives the event::Listener to keep listening on cgroups
+// memory pressure counters.
+class CounterProcess : public Process<CounterProcess>
+{
+public:
+  CounterProcess(const string& hierarchy,
+                 const string& cgroup,
+                 Level level)
+    : value_(0),
+      error(None()),
+      process(new event::Listener(
+          hierarchy,
+          cgroup,
+          "memory.pressure_level",
+          stringify(level))) {}
+
+  virtual ~CounterProcess() {}
+
+  Future<uint64_t> value()
+  {
+    if (error.isSome()) {
+      return Failure(error.get());
+    }
+
+    return value_;
+  }
+
+protected:
+  virtual void initialize()
+  {
+    spawn(CHECK_NOTNULL(process.get()));
+    listen();
+  }
+
+  virtual void finalize()
+  {
+    terminate(process.get());
+    wait(process.get());
+  }
+
+private:
+  void listen()
+  {
+    dispatch(process.get(), &event::Listener::listen)
+      .onAny(defer(self(), &CounterProcess::_listen, lambda::_1));
+  }
+
+  void _listen(const process::Future<uint64_t>& future)
+  {
+    CHECK(error.isNone());
+
+    if (future.isReady()) {
+      value_ += future.get();
+      listen();
+    } else if (future.isFailed()) {
+      error = Error(future.failure());
+    } else if (future.isDiscarded()) {
+      error = Error("Listening stopped unexpectedly");
+    }
+  }
+
+  uint64_t value_;
+  Option<Error> error;
+  process::Owned<event::Listener> process;
+};
+
+
+Try<Owned<Counter>> Counter::create(
+    const string& hierarchy,
+    const string& cgroup,
+    Level level)
+{
+  Option<Error> error = verify(hierarchy, cgroup);
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  return Owned<Counter>(new Counter(hierarchy, cgroup, level));
+}
+
+
+Counter::Counter(const string& hierarchy,
+                 const string& cgroup,
+                 Level level)
+  : process(new CounterProcess(hierarchy, cgroup, level))
+{
+  spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Counter::~Counter()
+{
+  terminate(process.get(), true);
+  wait(process.get());
+}
+
+
+Future<uint64_t> Counter::value() const
+{
+  return dispatch(process.get(), &CounterProcess::value);
+}
+
+} // namespace pressure {
+
 } // namespace memory {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index e07772f..f3a6c50 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -528,6 +528,57 @@ Try<Nothing> disable(
 
 } // namespace oom {
 
+
+// Memory pressure counters.
+namespace pressure {
+
+enum Level {
+  LOW,
+  MEDIUM,
+  CRITICAL
+};
+
+
+std::ostream& operator << (std::ostream& stream, Level level);
+
+
+// Forward declaration.
+class CounterProcess;
+
+
+// Counter is a primitive to listen on events of a given memory
+// pressure level for a cgroup and keep track of the number of
+// occurrence of that event. Use the public 'create' function to
+// create a new counter; see 'value' for how to use.
+class Counter
+{
+public:
+  // Create a memory pressure counter for the given cgroup on the
+  // specified level.
+  static Try<process::Owned<Counter>> create(
+      const std::string& hierarchy,
+      const std::string& cgroup,
+      Level level);
+
+  virtual ~Counter();
+
+  // Returns the current accumulated number of occurrences of the
+  // pressure event. Returns a failure if any error occurs while
+  // monitoring the pressure events, and any subsequent calls to
+  // 'value' will return the same failure. In such case, the user
+  // should consider creating a new Counter.
+  process::Future<uint64_t> value() const;
+
+private:
+  Counter(const std::string& hierarchy,
+          const std::string& cgroup,
+          Level level);
+
+  process::Owned<CounterProcess> process;
+};
+
+} // namespace pressure {
+
 } // namespace memory {
 
 


[2/2] mesos git commit: Added cgroups memory pressure counter tests.

Posted by ji...@apache.org.
Added cgroups memory pressure counter tests.

Review: https://reviews.apache.org/r/31276


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dfb2794d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dfb2794d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dfb2794d

Branch: refs/heads/master
Commit: dfb2794d8b0fb5d41c725d3944395606d24fbf26
Parents: cc5826d
Author: Chi Zhang <ch...@gmail.com>
Authored: Fri Mar 27 10:20:46 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Mar 27 12:56:24 2015 -0700

----------------------------------------------------------------------
 src/Makefile.am                       |  11 +
 src/tests/cgroups_tests.cpp           | 197 +++++++++++++++++-
 src/tests/memory_test_helper.cpp      | 320 +++++++++++++++++++++++++++++
 src/tests/memory_test_helper.hpp      |  89 ++++++++
 src/tests/memory_test_helper_main.cpp |  32 +++
 5 files changed, 647 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7a06c70..84a62d4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -562,6 +562,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	tests/isolator.hpp						\
 	tests/launcher.hpp						\
 	tests/limiter.hpp						\
+	tests/memory_test_helper.hpp					\
 	tests/mesos.hpp							\
 	tests/module.hpp						\
 	tests/script.hpp						\
@@ -1277,8 +1278,17 @@ if OS_LINUX
     tests/setns_test_helper.cpp
   setns_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS)
   setns_test_helper_LDADD = libmesos.la $(LDADD)
+
 endif
 
+check_PROGRAMS += memory-test-helper
+memory_test_helper_SOURCES =					\
+  tests/flags.cpp						\
+  tests/memory_test_helper_main.cpp				\
+  tests/memory_test_helper.cpp
+memory_test_helper_CPPFLAGS = $(mesos_tests_CPPFLAGS)
+memory_test_helper_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += active-user-test-helper
 active_user_test_helper_SOURCES = tests/active_user_test_helper.cpp
 active_user_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -1363,6 +1373,7 @@ mesos_tests_SOURCES =				\
   tests/master_slave_reconciliation_tests.cpp	\
   tests/master_tests.cpp			\
   tests/master_validation_tests.cpp		\
+  tests/memory_test_helper.cpp			\
   tests/mesos.cpp				\
   tests/metrics_tests.cpp			\
   tests/module.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/cgroups_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_tests.cpp b/src/tests/cgroups_tests.cpp
index 75c61aa..8483738 100644
--- a/src/tests/cgroups_tests.cpp
+++ b/src/tests/cgroups_tests.cpp
@@ -21,7 +21,6 @@
 #include <signal.h>
 #include <stdint.h>
 #include <stdio.h>
-#include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -37,6 +36,7 @@
 #include <gmock/gmock.h>
 
 #include <process/gtest.hpp>
+#include <process/owned.hpp>
 
 #include <stout/gtest.hpp>
 #include <stout/hashmap.hpp>
@@ -51,10 +51,15 @@
 #include "linux/cgroups.hpp"
 #include "linux/perf.hpp"
 
+#include "tests/memory_test_helper.hpp"
 #include "tests/mesos.hpp" // For TEST_CGROUPS_(HIERARCHY|ROOT).
+#include "tests/utils.hpp"
 
 using namespace process;
 
+using cgroups::memory::pressure::Level;
+using cgroups::memory::pressure::Counter;
+
 using std::set;
 
 namespace mesos {
@@ -62,7 +67,7 @@ namespace internal {
 namespace tests {
 
 
-class CgroupsTest : public ::testing::Test
+class CgroupsTest : public TemporaryDirectoryTest
 {
 public:
   static void SetUpTestCase()
@@ -117,6 +122,8 @@ public:
 protected:
   virtual void SetUp()
   {
+    CgroupsTest::SetUp();
+
     foreach (const std::string& subsystem, strings::tokenize(subsystems, ",")) {
       // Establish the base hierarchy if this is the first subsystem checked.
       if (baseHierarchy.empty()) {
@@ -182,6 +189,8 @@ protected:
         }
       }
     }
+
+    CgroupsTest::TearDown();
   }
 
   const std::string subsystems; // Subsystems required to run tests.
@@ -1033,6 +1042,190 @@ TEST_F(CgroupsAnyHierarchyWithPerfEventTest, ROOT_CGROUPS_Perf)
   AWAIT_READY(destroy);
 }
 
+
+class CgroupsAnyHierarchyMemoryPressureTest
+  : public CgroupsAnyHierarchyTest
+{
+public:
+  CgroupsAnyHierarchyMemoryPressureTest()
+    : CgroupsAnyHierarchyTest("memory"),
+      cgroup(TEST_CGROUPS_ROOT) {}
+
+protected:
+  virtual void SetUp()
+  {
+    CgroupsAnyHierarchyTest::SetUp();
+
+    hierarchy = path::join(baseHierarchy, "memory");
+
+    ASSERT_SOME(cgroups::create(hierarchy, cgroup));
+  }
+
+  void listen()
+  {
+    const std::vector<Level> levels = {
+      Level::LOW,
+      Level::MEDIUM,
+      Level::CRITICAL
+    };
+
+    foreach (Level level, levels) {
+      Try<Owned<Counter>> counter = Counter::create(hierarchy, cgroup, level);
+      EXPECT_SOME(counter);
+
+      counters[level] = counter.get();
+    }
+  }
+
+  std::string hierarchy;
+  const std::string cgroup;
+
+  hashmap<Level, Owned<Counter>> counters;
+};
+
+
+TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreaseUnlockedRSS)
+{
+  MemoryTestHelper helper;
+  ASSERT_SOME(helper.spawn());
+  ASSERT_SOME(helper.pid());
+
+  const Bytes limit = Megabytes(16);
+
+  // Move the memory test helper into a cgroup and set the limit.
+  EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit));
+  EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get()));
+
+  listen();
+
+  // Used to save the counter readings from last iteration.
+  uint64_t previousLow;
+  uint64_t previousMedium;
+  uint64_t previousCritical;
+
+  // Used to save the counter readings from this iteration.
+  uint64_t low;
+  uint64_t medium;
+  uint64_t critical;
+
+  // Use a guard to error out if it's been too long.
+  // TODO(chzhcn): Use a better way to set testing time limit.
+  uint64_t iterationLimit = limit.bytes() / getpagesize() * 10;
+
+  for (uint64_t i = 0; i < iterationLimit; i++) {
+    EXPECT_SOME(helper.increaseRSS(getpagesize()));
+
+    Future<uint64_t> _low = counters[Level::LOW]->value();
+    Future<uint64_t> _medium = counters[Level::MEDIUM]->value();
+    Future<uint64_t> _critical = counters[Level::CRITICAL]->value();
+
+    AWAIT_READY(_low);
+    AWAIT_READY(_medium);
+    AWAIT_READY(_critical);
+
+    low = _low.get();
+    medium = _medium.get();
+    critical = _critical.get();
+
+    // We need to know the readings are the same as last time to be
+    // sure they are stable, because the reading is not atomic. For
+    // example, the medium could turn positive after we read low to be
+    // 0, but this should be fixed by the next read immediately.
+    if ((low == previousLow &&
+         medium == previousMedium &&
+         critical == previousCritical)) {
+      if (low != 0) {
+        EXPECT_LE(medium, low);
+        EXPECT_LE(critical, medium);
+
+        // When child's RSS is full, it will be OOM-kill'ed if we
+        // don't stop it right away.
+        break;
+      } else {
+        EXPECT_EQ(0u, medium);
+        EXPECT_EQ(0u, critical);
+      }
+    }
+
+    previousLow = low;
+    previousMedium = medium;
+    previousCritical = critical;
+  }
+}
+
+
+TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreasePageCache)
+{
+  MemoryTestHelper helper;
+  ASSERT_SOME(helper.spawn());
+  ASSERT_SOME(helper.pid());
+
+  const Bytes limit = Megabytes(16);
+
+  // Move the memory test helper into a cgroup and set the limit.
+  EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit));
+  EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get()));
+
+  listen();
+
+  // Used to save the counter readings from last iteration.
+  uint64_t previousLow;
+  uint64_t previousMedium;
+  uint64_t previousCritical;
+
+  // Used to save the counter readings from this iteration.
+  uint64_t low;
+  uint64_t medium;
+  uint64_t critical;
+
+  // Use a guard to error out if it's been too long.
+  // TODO(chzhcn): Use a better way to set testing time limit.
+  uint64_t iterationLimit = limit.bytes() / Megabytes(1).bytes() * 2;
+
+  for (uint64_t i = 0; i < iterationLimit; i++) {
+    EXPECT_SOME(helper.increasePageCache(Megabytes(1)));
+
+    Future<uint64_t> _low = counters[Level::LOW]->value();
+    Future<uint64_t> _medium = counters[Level::MEDIUM]->value();
+    Future<uint64_t> _critical = counters[Level::CRITICAL]->value();
+
+    AWAIT_READY(_low);
+    AWAIT_READY(_medium);
+    AWAIT_READY(_critical);
+
+    low = _low.get();
+    medium = _medium.get();
+    critical = _critical.get();
+
+    // We need to know the readings are the same as last time to be
+    // sure they are stable, because the reading is not atomic. For
+    // example, the medium could turn positive after we read low to be
+    // 0, but this should be fixed by the next read immediately.
+    if ((low == previousLow &&
+         medium == previousMedium &&
+         critical == previousCritical)) {
+      if (low != 0) {
+        EXPECT_LE(medium, low);
+        EXPECT_LE(critical, medium);
+
+        // Different from the RSS test, since the child is only
+        // consuming at a slow rate the page cache, which is evictable
+        // and reclaimable, we could therefore be in this state
+        // forever. Our guard will let us out shortly.
+      } else {
+        EXPECT_EQ(0u, medium);
+        EXPECT_EQ(0u, critical);
+      }
+    }
+
+    previousLow = low;
+    previousMedium = medium;
+    previousCritical = critical;
+  }
+
+  EXPECT_LT(0u, low);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper.cpp b/src/tests/memory_test_helper.cpp
new file mode 100644
index 0000000..cdf769b
--- /dev/null
+++ b/src/tests/memory_test_helper.cpp
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.n
+ */
+
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <string>
+#include <vector>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+#include "tests/flags.hpp"
+#include "tests/memory_test_helper.hpp"
+
+using process::Subprocess;
+
+using std::cerr;
+using std::cin;
+using std::cout;
+using std::endl;
+using std::flush;
+using std::getline;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// Constants used to sync MemoryTestHelper and its subprocess.
+
+// Used by the subprocess to inform that it has started.
+const char STARTED = 'S';
+
+// Used by the subprocess to inform that the work requested is done.
+const char DONE = 'D';
+
+// Used to signal an increaseRSS request.
+const char INCREASE_RSS[] = "INCREASE_RSS";
+
+// Used to signal an increasePageCache request.
+const char INCREASE_PAGE_CACHE[] = "INCREASE_PAGE_CACHE";
+
+
+// This helper allocates and locks specified anonymous memory (RSS).
+// It uses mlock and memset to make sure allocated memory is mapped.
+static Try<void*> allocateRSS(const Bytes& size, bool lock = true)
+{
+  void* rss = NULL;
+
+  if (posix_memalign(&rss, getpagesize(), size.bytes()) != 0) {
+    return ErrnoError("Failed to increase RSS memory, posix_memalign");
+  }
+
+  // Locking a page makes it unevictable in the kernel.
+  if (lock && mlock(rss, size.bytes()) != 0) {
+    return ErrnoError("Failed to lock memory, mlock");
+  }
+
+  // Use memset to actually page in the memory in the kernel.
+  memset(rss, 1, size.bytes());
+
+  return rss;
+}
+
+
+static Try<Nothing> increaseRSS(const vector<string>& tokens)
+{
+  if (tokens.size() < 2) {
+    return Error("Expect at least one argument");
+  }
+
+  Try<Bytes> size = Bytes::parse(tokens[1]);
+  if (size.isError()) {
+    return Error("The first argument '" + tokens[1] + "' is not a byte size");
+  }
+
+  Try<void*> memory = allocateRSS(size.get());
+  if (memory.isError()) {
+    return Error("Failed to allocate RSS memory: " + memory.error());
+  }
+
+  return Nothing();
+}
+
+
+static Try<Nothing> increasePageCache(const vector<string>& tokens)
+{
+  const Bytes UNIT = Megabytes(1);
+
+  if (tokens.size() < 2) {
+    return Error("Expect at least one argument");
+  }
+
+  Try<Bytes> size = Bytes::parse(tokens[1]);
+  if (size.isError()) {
+    return Error("The first argument '" + tokens[1] + "' is not a byte size");
+  }
+
+  // TODO(chzhcn): Currently, we assume the current working directory
+  // is a temporary directory and will be cleaned up when the test
+  // finishes. Since the child process will inherit the current
+  // working directory from the parent process, that means the test
+  // that uses this helper probably needs to inherit from
+  // TemporaryDirectoryTest. Consider relaxing this constraint.
+  Try<string> path = os::mktemp(path::join(os::getcwd(), "XXXXXX"));
+  if (path.isError()) {
+    return Error("Failed to create a temporary file: " + path.error());
+  }
+
+  Try<int> fd = os::open(path.get(), O_WRONLY);
+  if (fd.isError()) {
+    return Error("Failed to open file: " + fd.error());
+  }
+
+  // NOTE: We are doing round-down here to calculate the number of
+  // writes to do.
+  for (uint64_t i = 0; i < size.get().bytes() / UNIT.bytes(); i++) {
+    // Write UNIT size to disk at a time. The content isn't important.
+    Try<Nothing> write = os::write(fd.get(), string(UNIT.bytes(), 'a'));
+    if (write.isError()) {
+      os::close(fd.get());
+      return Error("Failed to write file: " + write.error());
+    }
+
+    // Use fsync to make sure data is written to disk.
+    if (fsync(fd.get()) == -1) {
+      // Save the error message because os::close below might
+      // overwrite the errno.
+      const string message = strerror(errno);
+
+      os::close(fd.get());
+      return Error("Failed to fsync: " + message);
+    }
+  }
+
+  os::close(fd.get());
+  return Nothing();
+}
+
+
+MemoryTestHelper::~MemoryTestHelper()
+{
+  cleanup();
+}
+
+
+Try<Nothing> MemoryTestHelper::spawn()
+{
+  if (s.isSome()) {
+    return Error("A subprocess has been spawned already");
+  }
+
+  vector<string> argv;
+  argv.push_back("memory-test-helper");
+  argv.push_back(MemoryTestHelperMain::NAME);
+
+  Try<Subprocess> process = subprocess(
+      path::join(flags.build_dir,
+                 "src",
+                 "memory-test-helper"),
+      argv,
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      Subprocess::FD(STDERR_FILENO));
+
+  if (process.isError()) {
+    return Error("Failed to spawn a subprocess: " + process.error());
+  }
+
+  s = process.get();
+
+  // Wait for the child to inform it has started before returning.
+  // Otherwise, the user might set the memory limit too earlier, and
+  // cause the child oom-killed because 'ld' could use a lot of
+  // memory.
+  Result<string> read = os::read(s.get().out().get(), sizeof(STARTED));
+  if (!read.isSome() || read.get() != string(sizeof(STARTED), STARTED)) {
+    cleanup();
+    return Error("Failed to sync with the subprocess");
+  }
+
+  return Nothing();
+}
+
+
+void MemoryTestHelper::cleanup()
+{
+  if (s.isSome()) {
+    // We just want to make sure the subprocess is terminated in case
+    // it's stuck, but we don't care about its status. Any error
+    // should have been logged in the subprocess directly.
+    ::kill(s.get().pid(), SIGKILL);
+    ::waitpid(s.get().pid(), NULL, 0);
+    s = None();
+  }
+}
+
+
+Try<pid_t> MemoryTestHelper::pid()
+{
+  if (s.isNone()) {
+    return Error("The subprocess has not been spawned yet");
+  }
+
+  return s.get().pid();
+}
+
+
+// Send a request to the subprocess and wait for its signal that the
+// work has been done.
+Try<Nothing> MemoryTestHelper::requestAndWait(const string& request)
+{
+  if (s.isNone()) {
+    return Error("The subprocess has not been spawned yet");
+  }
+
+  Try<Nothing> write = os::write(s.get().in().get(), request + "\n");
+  if (write.isError()) {
+    cleanup();
+    return Error("Fail to sync with the subprocess: " + write.error());
+  }
+
+  Result<string> read = os::read(s.get().out().get(), sizeof(DONE));
+  if (!read.isSome() || read.get() != string(sizeof(DONE), DONE)) {
+    cleanup();
+    return Error("Failed to sync with the subprocess");
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> MemoryTestHelper::increaseRSS(const Bytes& size)
+{
+  return requestAndWait(string(INCREASE_RSS) + " " + stringify(size));
+}
+
+
+Try<Nothing> MemoryTestHelper::increasePageCache(const Bytes& size)
+{
+  return requestAndWait(string(INCREASE_PAGE_CACHE) + " " + stringify(size));
+}
+
+
+const char MemoryTestHelperMain::NAME[] = "MemoryTestHelperMain";
+
+
+int MemoryTestHelperMain::execute()
+{
+  hashmap<string, Try<Nothing>(*)(const vector<string>&)> commands;
+  commands[INCREASE_RSS] = &increaseRSS;
+  commands[INCREASE_PAGE_CACHE] = &increasePageCache;
+
+  // Tell the parent that child has started.
+  cout << STARTED << flush;
+
+  string line;
+  while(cin.good()) {
+    getline(cin, line);
+    vector<string> tokens = strings::tokenize(line, " ");
+
+    if (tokens.empty()) {
+      cerr << "No command from the parent" << endl;
+      return 1;
+    }
+
+    if (!commands.contains(tokens[0])) {
+      cerr << "Unknown command from the parent '" << tokens[0] << "'" << endl;
+      return 1;
+    }
+
+    Try<Nothing> result = commands[tokens[0]](tokens);
+    if (result.isError()) {
+      cerr << result.error();
+      return 1;
+    }
+
+    cout << DONE << flush;
+  }
+
+  if (!cin) {
+    cerr << "Failed to sync with the parent" << endl;
+    return 1;
+  }
+
+  return 0;
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.hpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper.hpp b/src/tests/memory_test_helper.hpp
new file mode 100644
index 0000000..11712d7
--- /dev/null
+++ b/src/tests/memory_test_helper.hpp
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MEMORY_TEST_HELPER_HPP__
+#define __MEMORY_TEST_HELPER_HPP__
+
+#include <process/subprocess.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// The abstraction for controlling the memory usage of a subprocess.
+// TODO(chzhcn): Currently, this helper is only supposed to be used by
+// one thread. Consider making it thread safe.
+class MemoryTestHelper
+{
+public:
+  MemoryTestHelper() {};
+  ~MemoryTestHelper();
+
+  // Spawns a subprocess.
+  // TODO(chzhcn): Consider returning a future instead of blocking.
+  Try<Nothing> spawn();
+
+  // Kill and reap the subprocess if exists.
+  // TODO(chzhcn): Consider returning a future instead of blocking.
+  void cleanup();
+
+  // Returns the pid of the subprocess.
+  Try<pid_t> pid();
+
+  // Allocate and lock specified page-aligned anonymous memory (RSS)
+  // in the subprocess. It uses mlock and memset to make sure
+  // allocated memory is mapped.
+  // TODO(chzhcn): Consider returning a future instead of blocking.
+  Try<Nothing> increaseRSS(const Bytes& size);
+
+  // This function attempts to generate requested size of page cache
+  // in the subprocess by using a small buffer and writing it to disk
+  // multiple times.
+  // TODO(chzhcn): Consider returning a future instead of blocking.
+  Try<Nothing> increasePageCache(const Bytes& size = Megabytes(1));
+
+private:
+  Try<Nothing> requestAndWait(const std::string& request);
+
+  Option<process::Subprocess> s;
+};
+
+
+// The actual subprocess behind MemoryTestHelper. It runs in a loop
+// and executes commands passed from stdin.
+class MemoryTestHelperMain : public Subcommand
+{
+public:
+  static const char NAME[];
+
+  MemoryTestHelperMain() : Subcommand(NAME) {};
+
+protected:
+  virtual int execute();
+};
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MEMORY_TEST_HELPER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper_main.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper_main.cpp b/src/tests/memory_test_helper_main.cpp
new file mode 100644
index 0000000..362535f
--- /dev/null
+++ b/src/tests/memory_test_helper_main.cpp
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/subcommand.hpp>
+
+#include "tests/memory_test_helper.hpp"
+
+using mesos::internal::tests::MemoryTestHelperMain;
+
+int main(int argc, char** argv)
+{
+  return Subcommand::dispatch(
+      None(),
+      argc,
+      argv,
+      new MemoryTestHelperMain());
+}