You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/27 20:58:19 UTC
[1/2] mesos git commit: Added support for listening on cgroups memory
pressures.
Repository: mesos
Updated Branches:
refs/heads/master 04f8302c0 -> dfb2794d8
Added support for listening on cgroups memory pressures.
Review: https://reviews.apache.org/r/30545
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc5826de
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc5826de
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc5826de
Branch: refs/heads/master
Commit: cc5826defbebb46bc66e8bbc8900d66b8f27893c
Parents: 04f8302
Author: Chi Zhang <ch...@gmail.com>
Authored: Fri Mar 27 10:04:23 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Mar 27 10:18:44 2015 -0700
----------------------------------------------------------------------
src/linux/cgroups.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++--
src/linux/cgroups.hpp | 51 +++++++++++++++++
2 files changed, 181 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index a533b31..df3211a 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -55,6 +55,7 @@
#include <stout/proc.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
#include "linux/cgroups.hpp"
#include "linux/fs.hpp"
@@ -71,6 +72,7 @@ using std::istringstream;
using std::list;
using std::map;
using std::ofstream;
+using std::ostream;
using std::ostringstream;
using std::set;
using std::string;
@@ -2149,11 +2151,8 @@ Try<Bytes> max_usage_in_bytes(const string& hierarchy, const string& cgroup)
namespace oom {
-namespace {
+static Nothing _nothing() { return Nothing(); }
-Nothing _nothing() { return Nothing(); }
-
-} // namespace {
Future<Nothing> listen(const string& hierarchy, const string& cgroup)
{
@@ -2239,6 +2238,133 @@ Try<Nothing> disable(const string& hierarchy, const string& cgroup)
} // namespace oom {
+
+namespace pressure {
+
+ostream& operator << (ostream& stream, Level level)
+{
+ switch (level) {
+ case LOW:
+ stream << "low";
+ break;
+ case MEDIUM:
+ stream << "medium";
+ break;
+ case CRITICAL:
+ stream << "critical";
+ break;
+ default:
+ UNREACHABLE();
+ }
+
+ return stream;
+}
+
+
+// The process drives the event::Listener to keep listening on cgroups
+// memory pressure counters.
+class CounterProcess : public Process<CounterProcess>
+{
+public:
+ CounterProcess(const string& hierarchy,
+ const string& cgroup,
+ Level level)
+ : value_(0),
+ error(None()),
+ process(new event::Listener(
+ hierarchy,
+ cgroup,
+ "memory.pressure_level",
+ stringify(level))) {}
+
+ virtual ~CounterProcess() {}
+
+ Future<uint64_t> value()
+ {
+ if (error.isSome()) {
+ return Failure(error.get());
+ }
+
+ return value_;
+ }
+
+protected:
+ virtual void initialize()
+ {
+ spawn(CHECK_NOTNULL(process.get()));
+ listen();
+ }
+
+ virtual void finalize()
+ {
+ terminate(process.get());
+ wait(process.get());
+ }
+
+private:
+ void listen()
+ {
+ dispatch(process.get(), &event::Listener::listen)
+ .onAny(defer(self(), &CounterProcess::_listen, lambda::_1));
+ }
+
+ void _listen(const process::Future<uint64_t>& future)
+ {
+ CHECK(error.isNone());
+
+ if (future.isReady()) {
+ value_ += future.get();
+ listen();
+ } else if (future.isFailed()) {
+ error = Error(future.failure());
+ } else if (future.isDiscarded()) {
+ error = Error("Listening stopped unexpectedly");
+ }
+ }
+
+ uint64_t value_;
+ Option<Error> error;
+ process::Owned<event::Listener> process;
+};
+
+
+Try<Owned<Counter>> Counter::create(
+ const string& hierarchy,
+ const string& cgroup,
+ Level level)
+{
+ Option<Error> error = verify(hierarchy, cgroup);
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ return Owned<Counter>(new Counter(hierarchy, cgroup, level));
+}
+
+
+Counter::Counter(const string& hierarchy,
+ const string& cgroup,
+ Level level)
+ : process(new CounterProcess(hierarchy, cgroup, level))
+{
+ spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Counter::~Counter()
+{
+ terminate(process.get(), true);
+ wait(process.get());
+}
+
+
+Future<uint64_t> Counter::value() const
+{
+ return dispatch(process.get(), &CounterProcess::value);
+}
+
+} // namespace pressure {
+
} // namespace memory {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index e07772f..f3a6c50 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -528,6 +528,57 @@ Try<Nothing> disable(
} // namespace oom {
+
+// Memory pressure counters.
+namespace pressure {
+
+enum Level {
+ LOW,
+ MEDIUM,
+ CRITICAL
+};
+
+
+std::ostream& operator << (std::ostream& stream, Level level);
+
+
+// Forward declaration.
+class CounterProcess;
+
+
+// Counter is a primitive to listen on events of a given memory
+// pressure level for a cgroup and keep track of the number of
+// occurrence of that event. Use the public 'create' function to
+// create a new counter; see 'value' for how to use.
+class Counter
+{
+public:
+ // Create a memory pressure counter for the given cgroup on the
+ // specified level.
+ static Try<process::Owned<Counter>> create(
+ const std::string& hierarchy,
+ const std::string& cgroup,
+ Level level);
+
+ virtual ~Counter();
+
+ // Returns the current accumulated number of occurrences of the
+ // pressure event. Returns a failure if any error occurs while
+ // monitoring the pressure events, and any subsequent calls to
+ // 'value' will return the same failure. In such case, the user
+ // should consider creating a new Counter.
+ process::Future<uint64_t> value() const;
+
+private:
+ Counter(const std::string& hierarchy,
+ const std::string& cgroup,
+ Level level);
+
+ process::Owned<CounterProcess> process;
+};
+
+} // namespace pressure {
+
} // namespace memory {
[2/2] mesos git commit: Added cgroups memory pressure counter tests.
Posted by ji...@apache.org.
Added cgroups memory pressure counter tests.
Review: https://reviews.apache.org/r/31276
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dfb2794d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dfb2794d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dfb2794d
Branch: refs/heads/master
Commit: dfb2794d8b0fb5d41c725d3944395606d24fbf26
Parents: cc5826d
Author: Chi Zhang <ch...@gmail.com>
Authored: Fri Mar 27 10:20:46 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Mar 27 12:56:24 2015 -0700
----------------------------------------------------------------------
src/Makefile.am | 11 +
src/tests/cgroups_tests.cpp | 197 +++++++++++++++++-
src/tests/memory_test_helper.cpp | 320 +++++++++++++++++++++++++++++
src/tests/memory_test_helper.hpp | 89 ++++++++
src/tests/memory_test_helper_main.cpp | 32 +++
5 files changed, 647 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7a06c70..84a62d4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -562,6 +562,7 @@ libmesos_no_3rdparty_la_SOURCES += \
tests/isolator.hpp \
tests/launcher.hpp \
tests/limiter.hpp \
+ tests/memory_test_helper.hpp \
tests/mesos.hpp \
tests/module.hpp \
tests/script.hpp \
@@ -1277,8 +1278,17 @@ if OS_LINUX
tests/setns_test_helper.cpp
setns_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS)
setns_test_helper_LDADD = libmesos.la $(LDADD)
+
endif
+check_PROGRAMS += memory-test-helper
+memory_test_helper_SOURCES = \
+ tests/flags.cpp \
+ tests/memory_test_helper_main.cpp \
+ tests/memory_test_helper.cpp
+memory_test_helper_CPPFLAGS = $(mesos_tests_CPPFLAGS)
+memory_test_helper_LDADD = libmesos.la $(LDADD)
+
check_PROGRAMS += active-user-test-helper
active_user_test_helper_SOURCES = tests/active_user_test_helper.cpp
active_user_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -1363,6 +1373,7 @@ mesos_tests_SOURCES = \
tests/master_slave_reconciliation_tests.cpp \
tests/master_tests.cpp \
tests/master_validation_tests.cpp \
+ tests/memory_test_helper.cpp \
tests/mesos.cpp \
tests/metrics_tests.cpp \
tests/module.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/cgroups_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_tests.cpp b/src/tests/cgroups_tests.cpp
index 75c61aa..8483738 100644
--- a/src/tests/cgroups_tests.cpp
+++ b/src/tests/cgroups_tests.cpp
@@ -21,7 +21,6 @@
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
-#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -37,6 +36,7 @@
#include <gmock/gmock.h>
#include <process/gtest.hpp>
+#include <process/owned.hpp>
#include <stout/gtest.hpp>
#include <stout/hashmap.hpp>
@@ -51,10 +51,15 @@
#include "linux/cgroups.hpp"
#include "linux/perf.hpp"
+#include "tests/memory_test_helper.hpp"
#include "tests/mesos.hpp" // For TEST_CGROUPS_(HIERARCHY|ROOT).
+#include "tests/utils.hpp"
using namespace process;
+using cgroups::memory::pressure::Level;
+using cgroups::memory::pressure::Counter;
+
using std::set;
namespace mesos {
@@ -62,7 +67,7 @@ namespace internal {
namespace tests {
-class CgroupsTest : public ::testing::Test
+class CgroupsTest : public TemporaryDirectoryTest
{
public:
static void SetUpTestCase()
@@ -117,6 +122,8 @@ public:
protected:
virtual void SetUp()
{
+ CgroupsTest::SetUp();
+
foreach (const std::string& subsystem, strings::tokenize(subsystems, ",")) {
// Establish the base hierarchy if this is the first subsystem checked.
if (baseHierarchy.empty()) {
@@ -182,6 +189,8 @@ protected:
}
}
}
+
+ CgroupsTest::TearDown();
}
const std::string subsystems; // Subsystems required to run tests.
@@ -1033,6 +1042,190 @@ TEST_F(CgroupsAnyHierarchyWithPerfEventTest, ROOT_CGROUPS_Perf)
AWAIT_READY(destroy);
}
+
+class CgroupsAnyHierarchyMemoryPressureTest
+ : public CgroupsAnyHierarchyTest
+{
+public:
+ CgroupsAnyHierarchyMemoryPressureTest()
+ : CgroupsAnyHierarchyTest("memory"),
+ cgroup(TEST_CGROUPS_ROOT) {}
+
+protected:
+ virtual void SetUp()
+ {
+ CgroupsAnyHierarchyTest::SetUp();
+
+ hierarchy = path::join(baseHierarchy, "memory");
+
+ ASSERT_SOME(cgroups::create(hierarchy, cgroup));
+ }
+
+ void listen()
+ {
+ const std::vector<Level> levels = {
+ Level::LOW,
+ Level::MEDIUM,
+ Level::CRITICAL
+ };
+
+ foreach (Level level, levels) {
+ Try<Owned<Counter>> counter = Counter::create(hierarchy, cgroup, level);
+ EXPECT_SOME(counter);
+
+ counters[level] = counter.get();
+ }
+ }
+
+ std::string hierarchy;
+ const std::string cgroup;
+
+ hashmap<Level, Owned<Counter>> counters;
+};
+
+
+TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreaseUnlockedRSS)
+{
+ MemoryTestHelper helper;
+ ASSERT_SOME(helper.spawn());
+ ASSERT_SOME(helper.pid());
+
+ const Bytes limit = Megabytes(16);
+
+ // Move the memory test helper into a cgroup and set the limit.
+ EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit));
+ EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get()));
+
+ listen();
+
+ // Used to save the counter readings from last iteration.
+ uint64_t previousLow;
+ uint64_t previousMedium;
+ uint64_t previousCritical;
+
+ // Used to save the counter readings from this iteration.
+ uint64_t low;
+ uint64_t medium;
+ uint64_t critical;
+
+ // Use a guard to error out if it's been too long.
+ // TODO(chzhcn): Use a better way to set testing time limit.
+ uint64_t iterationLimit = limit.bytes() / getpagesize() * 10;
+
+ for (uint64_t i = 0; i < iterationLimit; i++) {
+ EXPECT_SOME(helper.increaseRSS(getpagesize()));
+
+ Future<uint64_t> _low = counters[Level::LOW]->value();
+ Future<uint64_t> _medium = counters[Level::MEDIUM]->value();
+ Future<uint64_t> _critical = counters[Level::CRITICAL]->value();
+
+ AWAIT_READY(_low);
+ AWAIT_READY(_medium);
+ AWAIT_READY(_critical);
+
+ low = _low.get();
+ medium = _medium.get();
+ critical = _critical.get();
+
+ // We need to know the readings are the same as last time to be
+ // sure they are stable, because the reading is not atomic. For
+ // example, the medium could turn positive after we read low to be
+ // 0, but this should be fixed by the next read immediately.
+ if ((low == previousLow &&
+ medium == previousMedium &&
+ critical == previousCritical)) {
+ if (low != 0) {
+ EXPECT_LE(medium, low);
+ EXPECT_LE(critical, medium);
+
+ // When child's RSS is full, it will be OOM-kill'ed if we
+ // don't stop it right away.
+ break;
+ } else {
+ EXPECT_EQ(0u, medium);
+ EXPECT_EQ(0u, critical);
+ }
+ }
+
+ previousLow = low;
+ previousMedium = medium;
+ previousCritical = critical;
+ }
+}
+
+
+TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreasePageCache)
+{
+ MemoryTestHelper helper;
+ ASSERT_SOME(helper.spawn());
+ ASSERT_SOME(helper.pid());
+
+ const Bytes limit = Megabytes(16);
+
+ // Move the memory test helper into a cgroup and set the limit.
+ EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit));
+ EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get()));
+
+ listen();
+
+ // Used to save the counter readings from last iteration.
+ uint64_t previousLow;
+ uint64_t previousMedium;
+ uint64_t previousCritical;
+
+ // Used to save the counter readings from this iteration.
+ uint64_t low;
+ uint64_t medium;
+ uint64_t critical;
+
+ // Use a guard to error out if it's been too long.
+ // TODO(chzhcn): Use a better way to set testing time limit.
+ uint64_t iterationLimit = limit.bytes() / Megabytes(1).bytes() * 2;
+
+ for (uint64_t i = 0; i < iterationLimit; i++) {
+ EXPECT_SOME(helper.increasePageCache(Megabytes(1)));
+
+ Future<uint64_t> _low = counters[Level::LOW]->value();
+ Future<uint64_t> _medium = counters[Level::MEDIUM]->value();
+ Future<uint64_t> _critical = counters[Level::CRITICAL]->value();
+
+ AWAIT_READY(_low);
+ AWAIT_READY(_medium);
+ AWAIT_READY(_critical);
+
+ low = _low.get();
+ medium = _medium.get();
+ critical = _critical.get();
+
+ // We need to know the readings are the same as last time to be
+ // sure they are stable, because the reading is not atomic. For
+ // example, the medium could turn positive after we read low to be
+ // 0, but this should be fixed by the next read immediately.
+ if ((low == previousLow &&
+ medium == previousMedium &&
+ critical == previousCritical)) {
+ if (low != 0) {
+ EXPECT_LE(medium, low);
+ EXPECT_LE(critical, medium);
+
+ // Different from the RSS test, since the child is only
+ // consuming at a slow rate the page cache, which is evictable
+ // and reclaimable, we could therefore be in this state
+ // forever. Our guard will let us out shortly.
+ } else {
+ EXPECT_EQ(0u, medium);
+ EXPECT_EQ(0u, critical);
+ }
+ }
+
+ previousLow = low;
+ previousMedium = medium;
+ previousCritical = critical;
+ }
+
+ EXPECT_LT(0u, low);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper.cpp b/src/tests/memory_test_helper.cpp
new file mode 100644
index 0000000..cdf769b
--- /dev/null
+++ b/src/tests/memory_test_helper.cpp
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.n
+ */
+
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <string>
+#include <vector>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+#include "tests/flags.hpp"
+#include "tests/memory_test_helper.hpp"
+
+using process::Subprocess;
+
+using std::cerr;
+using std::cin;
+using std::cout;
+using std::endl;
+using std::flush;
+using std::getline;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// Constants used to sync MemoryTestHelper and its subprocess.
+
+// Used by the subprocess to inform that it has started.
+const char STARTED = 'S';
+
+// Used by the subprocess to inform that the work requested is done.
+const char DONE = 'D';
+
+// Used to signal an increaseRSS request.
+const char INCREASE_RSS[] = "INCREASE_RSS";
+
+// Used to signal an increasePageCache request.
+const char INCREASE_PAGE_CACHE[] = "INCREASE_PAGE_CACHE";
+
+
+// This helper allocates and locks specified anonymous memory (RSS).
+// It uses mlock and memset to make sure allocated memory is mapped.
+static Try<void*> allocateRSS(const Bytes& size, bool lock = true)
+{
+ void* rss = NULL;
+
+ if (posix_memalign(&rss, getpagesize(), size.bytes()) != 0) {
+ return ErrnoError("Failed to increase RSS memory, posix_memalign");
+ }
+
+ // Locking a page makes it unevictable in the kernel.
+ if (lock && mlock(rss, size.bytes()) != 0) {
+ return ErrnoError("Failed to lock memory, mlock");
+ }
+
+ // Use memset to actually page in the memory in the kernel.
+ memset(rss, 1, size.bytes());
+
+ return rss;
+}
+
+
+static Try<Nothing> increaseRSS(const vector<string>& tokens)
+{
+ if (tokens.size() < 2) {
+ return Error("Expect at least one argument");
+ }
+
+ Try<Bytes> size = Bytes::parse(tokens[1]);
+ if (size.isError()) {
+ return Error("The first argument '" + tokens[1] + "' is not a byte size");
+ }
+
+ Try<void*> memory = allocateRSS(size.get());
+ if (memory.isError()) {
+ return Error("Failed to allocate RSS memory: " + memory.error());
+ }
+
+ return Nothing();
+}
+
+
+static Try<Nothing> increasePageCache(const vector<string>& tokens)
+{
+ const Bytes UNIT = Megabytes(1);
+
+ if (tokens.size() < 2) {
+ return Error("Expect at least one argument");
+ }
+
+ Try<Bytes> size = Bytes::parse(tokens[1]);
+ if (size.isError()) {
+ return Error("The first argument '" + tokens[1] + "' is not a byte size");
+ }
+
+ // TODO(chzhcn): Currently, we assume the current working directory
+ // is a temporary directory and will be cleaned up when the test
+ // finishes. Since the child process will inherit the current
+ // working directory from the parent process, that means the test
+ // that uses this helper probably needs to inherit from
+ // TemporaryDirectoryTest. Consider relaxing this constraint.
+ Try<string> path = os::mktemp(path::join(os::getcwd(), "XXXXXX"));
+ if (path.isError()) {
+ return Error("Failed to create a temporary file: " + path.error());
+ }
+
+ Try<int> fd = os::open(path.get(), O_WRONLY);
+ if (fd.isError()) {
+ return Error("Failed to open file: " + fd.error());
+ }
+
+ // NOTE: We are doing round-down here to calculate the number of
+ // writes to do.
+ for (uint64_t i = 0; i < size.get().bytes() / UNIT.bytes(); i++) {
+ // Write UNIT size to disk at a time. The content isn't important.
+ Try<Nothing> write = os::write(fd.get(), string(UNIT.bytes(), 'a'));
+ if (write.isError()) {
+ os::close(fd.get());
+ return Error("Failed to write file: " + write.error());
+ }
+
+ // Use fsync to make sure data is written to disk.
+ if (fsync(fd.get()) == -1) {
+ // Save the error message because os::close below might
+ // overwrite the errno.
+ const string message = strerror(errno);
+
+ os::close(fd.get());
+ return Error("Failed to fsync: " + message);
+ }
+ }
+
+ os::close(fd.get());
+ return Nothing();
+}
+
+
+MemoryTestHelper::~MemoryTestHelper()
+{
+ cleanup();
+}
+
+
+Try<Nothing> MemoryTestHelper::spawn()
+{
+ if (s.isSome()) {
+ return Error("A subprocess has been spawned already");
+ }
+
+ vector<string> argv;
+ argv.push_back("memory-test-helper");
+ argv.push_back(MemoryTestHelperMain::NAME);
+
+ Try<Subprocess> process = subprocess(
+ path::join(flags.build_dir,
+ "src",
+ "memory-test-helper"),
+ argv,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::FD(STDERR_FILENO));
+
+ if (process.isError()) {
+ return Error("Failed to spawn a subprocess: " + process.error());
+ }
+
+ s = process.get();
+
+ // Wait for the child to inform it has started before returning.
+ // Otherwise, the user might set the memory limit too earlier, and
+ // cause the child oom-killed because 'ld' could use a lot of
+ // memory.
+ Result<string> read = os::read(s.get().out().get(), sizeof(STARTED));
+ if (!read.isSome() || read.get() != string(sizeof(STARTED), STARTED)) {
+ cleanup();
+ return Error("Failed to sync with the subprocess");
+ }
+
+ return Nothing();
+}
+
+
+void MemoryTestHelper::cleanup()
+{
+ if (s.isSome()) {
+ // We just want to make sure the subprocess is terminated in case
+ // it's stuck, but we don't care about its status. Any error
+ // should have been logged in the subprocess directly.
+ ::kill(s.get().pid(), SIGKILL);
+ ::waitpid(s.get().pid(), NULL, 0);
+ s = None();
+ }
+}
+
+
+Try<pid_t> MemoryTestHelper::pid()
+{
+ if (s.isNone()) {
+ return Error("The subprocess has not been spawned yet");
+ }
+
+ return s.get().pid();
+}
+
+
+// Send a request to the subprocess and wait for its signal that the
+// work has been done.
+Try<Nothing> MemoryTestHelper::requestAndWait(const string& request)
+{
+ if (s.isNone()) {
+ return Error("The subprocess has not been spawned yet");
+ }
+
+ Try<Nothing> write = os::write(s.get().in().get(), request + "\n");
+ if (write.isError()) {
+ cleanup();
+ return Error("Fail to sync with the subprocess: " + write.error());
+ }
+
+ Result<string> read = os::read(s.get().out().get(), sizeof(DONE));
+ if (!read.isSome() || read.get() != string(sizeof(DONE), DONE)) {
+ cleanup();
+ return Error("Failed to sync with the subprocess");
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing> MemoryTestHelper::increaseRSS(const Bytes& size)
+{
+ return requestAndWait(string(INCREASE_RSS) + " " + stringify(size));
+}
+
+
+Try<Nothing> MemoryTestHelper::increasePageCache(const Bytes& size)
+{
+ return requestAndWait(string(INCREASE_PAGE_CACHE) + " " + stringify(size));
+}
+
+
+const char MemoryTestHelperMain::NAME[] = "MemoryTestHelperMain";
+
+
+int MemoryTestHelperMain::execute()
+{
+ hashmap<string, Try<Nothing>(*)(const vector<string>&)> commands;
+ commands[INCREASE_RSS] = &increaseRSS;
+ commands[INCREASE_PAGE_CACHE] = &increasePageCache;
+
+ // Tell the parent that child has started.
+ cout << STARTED << flush;
+
+ string line;
+ while(cin.good()) {
+ getline(cin, line);
+ vector<string> tokens = strings::tokenize(line, " ");
+
+ if (tokens.empty()) {
+ cerr << "No command from the parent" << endl;
+ return 1;
+ }
+
+ if (!commands.contains(tokens[0])) {
+ cerr << "Unknown command from the parent '" << tokens[0] << "'" << endl;
+ return 1;
+ }
+
+ Try<Nothing> result = commands[tokens[0]](tokens);
+ if (result.isError()) {
+ cerr << result.error();
+ return 1;
+ }
+
+ cout << DONE << flush;
+ }
+
+ if (!cin) {
+ cerr << "Failed to sync with the parent" << endl;
+ return 1;
+ }
+
+ return 0;
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.hpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper.hpp b/src/tests/memory_test_helper.hpp
new file mode 100644
index 0000000..11712d7
--- /dev/null
+++ b/src/tests/memory_test_helper.hpp
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MEMORY_TEST_HELPER_HPP__
+#define __MEMORY_TEST_HELPER_HPP__
+
+#include <process/subprocess.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// The abstraction for controlling the memory usage of a subprocess.
+// TODO(chzhcn): Currently, this helper is only supposed to be used by
+// one thread. Consider making it thread safe.
+class MemoryTestHelper
+{
+public:
+ MemoryTestHelper() {};
+ ~MemoryTestHelper();
+
+ // Spawns a subprocess.
+ // TODO(chzhcn): Consider returning a future instead of blocking.
+ Try<Nothing> spawn();
+
+ // Kill and reap the subprocess if exists.
+ // TODO(chzhcn): Consider returning a future instead of blocking.
+ void cleanup();
+
+ // Returns the pid of the subprocess.
+ Try<pid_t> pid();
+
+ // Allocate and lock specified page-aligned anonymous memory (RSS)
+ // in the subprocess. It uses mlock and memset to make sure
+ // allocated memory is mapped.
+ // TODO(chzhcn): Consider returning a future instead of blocking.
+ Try<Nothing> increaseRSS(const Bytes& size);
+
+ // This function attempts to generate requested size of page cache
+ // in the subprocess by using a small buffer and writing it to disk
+ // multiple times.
+ // TODO(chzhcn): Consider returning a future instead of blocking.
+ Try<Nothing> increasePageCache(const Bytes& size = Megabytes(1));
+
+private:
+ Try<Nothing> requestAndWait(const std::string& request);
+
+ Option<process::Subprocess> s;
+};
+
+
+// The actual subprocess behind MemoryTestHelper. It runs in a loop
+// and executes commands passed from stdin.
+class MemoryTestHelperMain : public Subcommand
+{
+public:
+ static const char NAME[];
+
+ MemoryTestHelperMain() : Subcommand(NAME) {};
+
+protected:
+ virtual int execute();
+};
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MEMORY_TEST_HELPER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper_main.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_test_helper_main.cpp b/src/tests/memory_test_helper_main.cpp
new file mode 100644
index 0000000..362535f
--- /dev/null
+++ b/src/tests/memory_test_helper_main.cpp
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/subcommand.hpp>
+
+#include "tests/memory_test_helper.hpp"
+
+using mesos::internal::tests::MemoryTestHelperMain;
+
+int main(int argc, char** argv)
+{
+ return Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ new MemoryTestHelperMain());
+}