You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/04/01 02:48:44 UTC
mesos git commit: Exposed container memory pressures in the cgroups
memory isolator.
Repository: mesos
Updated Branches:
refs/heads/master 1d5cc168b -> 95499fd9b
Exposed container memory pressures in the cgroups memory isolator.
Review: https://reviews.apache.org/r/30546
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95499fd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95499fd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95499fd9
Branch: refs/heads/master
Commit: 95499fd9b001c7797f18281ddb1149bc446582b5
Parents: 1d5cc16
Author: Chi Zhang <ch...@gmail.com>
Authored: Tue Mar 31 15:43:41 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Mar 31 17:47:52 2015 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 9 +
src/Makefile.am | 1 +
.../containerizer/isolators/cgroups/mem.cpp | 108 +++++++
.../containerizer/isolators/cgroups/mem.hpp | 21 +-
src/tests/memory_pressure_tests.cpp | 294 +++++++++++++++++++
5 files changed, 432 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 3c592d5..0cbee3b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -463,6 +463,15 @@ message ResourceStatistics {
optional uint64 mem_anon_bytes = 11;
optional uint64 mem_mapped_file_bytes = 12;
+ // Number of occurrences of different levels of memory pressure
+ // events reported by memory cgroup. Pressure listening (re)starts
+ // with these values set to 0 when slave (re)starts. See
+ // https://www.kernel.org/doc/Documentation/cgroups/memory.txt for
+ // more details.
+ optional uint64 mem_low_pressure_counter = 32;
+ optional uint64 mem_medium_pressure_counter = 33;
+ optional uint64 mem_critical_pressure_counter = 34;
+
// Disk Usage Information for executor working directory.
optional uint64 disk_limit_bytes = 26;
optional uint64 disk_used_bytes = 27;
http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 56ed9d9..9c01f5d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1417,6 +1417,7 @@ if OS_LINUX
mesos_tests_SOURCES += tests/cgroups_isolator_tests.cpp
mesos_tests_SOURCES += tests/cgroups_tests.cpp
mesos_tests_SOURCES += tests/fs_tests.cpp
+ mesos_tests_SOURCES += tests/memory_pressure_tests.cpp
mesos_tests_SOURCES += tests/ns_tests.cpp
mesos_tests_SOURCES += tests/perf_tests.cpp
mesos_tests_SOURCES += tests/setns_test_helper.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp
index eaeb301..a7a83ef 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.cpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.cpp
@@ -18,6 +18,7 @@
#include <stdint.h>
+#include <list>
#include <vector>
#include <mesos/resources.hpp>
@@ -45,6 +46,9 @@
using namespace process;
+using cgroups::memory::pressure::Level;
+using cgroups::memory::pressure::Counter;
+
using std::list;
using std::ostringstream;
using std::set;
@@ -64,6 +68,13 @@ using mesos::slave::Limitation;
template<class T>
static Future<Option<T>> none() { return None(); }
+
+static const vector<Level> levels()
+{
+ return {Level::LOW, Level::MEDIUM, Level::CRITICAL};
+}
+
+
CgroupsMemIsolatorProcess::CgroupsMemIsolatorProcess(
const Flags& _flags,
const string& _hierarchy,
@@ -110,6 +121,22 @@ Try<Isolator*> CgroupsMemIsolatorProcess::create(const Flags& flags)
return Error(enable.error());
}
+ // Test if memory pressure listening is enabled. We test that on the
+ // root cgroup. We rely on 'Counter::create' to test if memory
+ // pressure listening is enabled or not. The created counters will
+ // be destroyed immediately.
+ foreach (Level level, levels()) {
+ Try<Owned<Counter>> counter = Counter::create(
+ hierarchy.get(),
+ flags.cgroups_root,
+ level);
+
+ if (counter.isError()) {
+ return Error("Failed to listen on " + stringify(level) +
+ " memory events: " + counter.error());
+ }
+ }
+
// Determine whether to limit swap or not.
bool limitSwap = false;
@@ -167,6 +194,7 @@ Future<Nothing> CgroupsMemIsolatorProcess::recover(
cgroups.insert(cgroup);
oomListen(containerId);
+ pressureListen(containerId);
}
Try<vector<string>> orphans = cgroups::get(
@@ -245,6 +273,7 @@ Future<Option<CommandInfo>> CgroupsMemIsolatorProcess::prepare(
}
oomListen(containerId);
+ pressureListen(containerId);
return update(containerId, executorInfo.resources())
.then(lambda::bind(none<CommandInfo>));
@@ -417,6 +446,59 @@ Future<ResourceStatistics> CgroupsMemIsolatorProcess::usage(
result.set_mem_mapped_file_bytes(total_mapped_file.get());
}
+ // Get pressure counter readings.
+ list<Level> levels;
+ list<Future<uint64_t>> values;
+ foreachpair (Level level,
+ const Owned<Counter>& counter,
+ info->pressureCounters) {
+ levels.push_back(level);
+ values.push_back(counter->value());
+ }
+
+ return await(values)
+ .then(defer(PID<CgroupsMemIsolatorProcess>(this),
+ &CgroupsMemIsolatorProcess::_usage,
+ containerId,
+ result,
+ levels,
+ lambda::_1));
+}
+
+
+Future<ResourceStatistics> CgroupsMemIsolatorProcess::_usage(
+ const ContainerID& containerId,
+ ResourceStatistics result,
+ const list<Level>& levels,
+ const list<Future<uint64_t>>& values)
+{
+ if (!infos.contains(containerId)) {
+ return Failure("Unknown container");
+ }
+
+ list<Level>::const_iterator iterator = levels.begin();
+ foreach (const Future<uint64_t>& value, values) {
+ if (value.isReady()) {
+ switch (*iterator) {
+ case Level::LOW:
+ result.set_mem_low_pressure_counter(value.get());
+ break;
+ case Level::MEDIUM:
+ result.set_mem_medium_pressure_counter(value.get());
+ break;
+ case Level::CRITICAL:
+ result.set_mem_critical_pressure_counter(value.get());
+ break;
+ }
+ } else {
+ LOG(ERROR) << "Failed to listen on " << stringify(*iterator)
+ << " pressure events for container " << containerId << ": "
+ << (value.isFailed() ? value.failure() : "discarded");
+ }
+
+ ++iterator;
+ }
+
return result;
}
@@ -580,6 +662,32 @@ void CgroupsMemIsolatorProcess::oom(const ContainerID& containerId)
info->limitation.set(Limitation(mem, message.str()));
}
+
+void CgroupsMemIsolatorProcess::pressureListen(
+ const ContainerID& containerId)
+{
+ CHECK(infos.contains(containerId));
+ Info* info = CHECK_NOTNULL(infos[containerId]);
+
+ foreach (Level level, levels()) {
+ Try<Owned<Counter>> counter = Counter::create(
+ hierarchy,
+ info->cgroup,
+ level);
+
+ if (counter.isError()) {
+ LOG(ERROR) << "Failed to listen on " << level << " memory pressure "
+ << "events for container " << containerId << ": "
+ << counter.error();
+ } else {
+ info->pressureCounters[level] = counter.get();
+
+ LOG(INFO) << "Started listening on " << level << " memory pressure "
+ << "events for container " << containerId;
+ }
+ }
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.hpp b/src/slave/containerizer/isolators/cgroups/mem.hpp
index a00f723..d510bc0 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.hpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.hpp
@@ -19,10 +19,16 @@
#ifndef __MEM_ISOLATOR_HPP__
#define __MEM_ISOLATOR_HPP__
+#include <list>
+
#include <mesos/slave/isolator.hpp>
+#include <process/owned.hpp>
+
#include <stout/hashmap.hpp>
+#include "linux/cgroups.hpp"
+
#include "slave/flags.hpp"
#include "slave/containerizer/isolators/cgroups/constants.hpp"
@@ -70,7 +76,13 @@ private:
const std::string& hierarchy,
bool limitSwap);
- virtual process::Future<Nothing> _cleanup(
+ process::Future<ResourceStatistics> _usage(
+ const ContainerID& containerId,
+ ResourceStatistics result,
+ const std::list<cgroups::memory::pressure::Level>& levels,
+ const std::list<process::Future<uint64_t>>& values);
+
+ process::Future<Nothing> _cleanup(
const ContainerID& containerId,
const process::Future<Nothing>& future);
@@ -87,6 +99,10 @@ private:
// Used to cancel the OOM listening.
process::Future<Nothing> oomNotifier;
+
+ hashmap<cgroups::memory::pressure::Level,
+ process::Owned<cgroups::memory::pressure::Counter>>
+ pressureCounters;
};
// Start listening on OOM events. This function will create an
@@ -102,6 +118,9 @@ private:
// This function is invoked when the OOM event happens.
void oom(const ContainerID& containerId);
+ // Start listening on memory pressure events.
+ void pressureListen(const ContainerID& containerId);
+
const Flags flags;
// The path to the cgroups subsystem hierarchy root.
http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/tests/memory_pressure_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_pressure_tests.cpp b/src/tests/memory_pressure_tests.cpp
new file mode 100644
index 0000000..e0b33ae
--- /dev/null
+++ b/src/tests/memory_pressure_tests.cpp
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <vector>
+
+#include <mesos/resources.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/gtest.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/os.hpp>
+
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::Slave;
+
+using std::vector;
+
+using testing::_;
+using testing::Eq;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MemoryPressureMesosTest : public ContainerizerTest<MesosContainerizer>
+{
+public:
+ static void SetUpTestCase()
+ {
+ // Verify that the dd command and its flags used in a bit are valid
+ // on this system.
+ ASSERT_EQ(0, os::system("dd count=1 bs=1M if=/dev/zero of=/dev/null"))
+ << "Cannot find a compatible 'dd' command";
+ }
+};
+
+
+TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ // We only care about memory cgroup for this test.
+ flags.isolation = "cgroups/mem";
+ flags.slave_subsystems = None();
+
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> containerizer =
+ MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(containerizer);
+
+ Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Offer offer = offers.get()[0];
+
+ // Run a task that triggers memory pressure event. We request 1G
+ // disk because we are going to write a 512 MB file repeatedly.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:256;disk:1024").get(),
+ "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId = *(containers.get().begin());
+
+ Duration waited = Duration::zero();
+ do {
+ Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ if (usage.get().mem_low_pressure_counter() > 0) {
+ EXPECT_GE(usage.get().mem_low_pressure_counter(),
+ usage.get().mem_medium_pressure_counter());
+ EXPECT_GE(usage.get().mem_medium_pressure_counter(),
+ usage.get().mem_critical_pressure_counter());
+ break;
+ }
+
+ os::sleep(Milliseconds(100));
+ waited += Milliseconds(100);
+ } while (waited < Seconds(5));
+
+ EXPECT_LE(waited, Seconds(5));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+ delete containerizer.get();
+}
+
+
+// Test that memory pressure listening is restarted after recovery.
+TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ // We only care about memory cgroup for this test.
+ flags.isolation = "cgroups/mem";
+ flags.slave_subsystems = None();
+
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> containerizer1 =
+ MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(containerizer1);
+
+ Try<PID<Slave>> slave = StartSlave(containerizer1.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Offer offer = offers.get()[0];
+
+ // Run a task that triggers memory pressure event. We request 1G
+ // disk because we are going to write a 512 MB file repeatedly.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:256;disk:1024").get(),
+ "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // We restart the slave to let it recover.
+ Stop(slave.get());
+ delete containerizer1.get();
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Use the same flags.
+ Try<MesosContainerizer*> containerizer2 =
+ MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(containerizer2);
+
+ slave = StartSlave(containerizer2.get(), flags);
+ ASSERT_SOME(slave);
+
+ Clock::pause();
+
+ AWAIT_READY(_recover);
+
+ // Wait for slave to schedule reregister timeout.
+ Clock::settle();
+
+ // Ensure the slave considers itself recovered.
+ Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT);
+
+ Clock::resume();
+
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Future<hashset<ContainerID>> containers = containerizer2.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId = *(containers.get().begin());
+
+ Duration waited = Duration::zero();
+ do {
+ Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ if (usage.get().mem_low_pressure_counter() > 0) {
+ EXPECT_GE(usage.get().mem_low_pressure_counter(),
+ usage.get().mem_medium_pressure_counter());
+ EXPECT_GE(usage.get().mem_medium_pressure_counter(),
+ usage.get().mem_critical_pressure_counter());
+ break;
+ }
+
+ os::sleep(Milliseconds(100));
+ waited += Milliseconds(100);
+ } while (waited < Seconds(5));
+
+ EXPECT_LE(waited, Seconds(5));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+ delete containerizer2.get();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {