You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/04/01 02:48:44 UTC

mesos git commit: Exposed container memory pressures in the cgroups memory isolator.

Repository: mesos
Updated Branches:
  refs/heads/master 1d5cc168b -> 95499fd9b


Exposed container memory pressures in the cgroups memory isolator.

Review: https://reviews.apache.org/r/30546


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95499fd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95499fd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95499fd9

Branch: refs/heads/master
Commit: 95499fd9b001c7797f18281ddb1149bc446582b5
Parents: 1d5cc16
Author: Chi Zhang <ch...@gmail.com>
Authored: Tue Mar 31 15:43:41 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Mar 31 17:47:52 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |   9 +
 src/Makefile.am                                 |   1 +
 .../containerizer/isolators/cgroups/mem.cpp     | 108 +++++++
 .../containerizer/isolators/cgroups/mem.hpp     |  21 +-
 src/tests/memory_pressure_tests.cpp             | 294 +++++++++++++++++++
 5 files changed, 432 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 3c592d5..0cbee3b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -463,6 +463,15 @@ message ResourceStatistics {
   optional uint64 mem_anon_bytes = 11;
   optional uint64 mem_mapped_file_bytes = 12;
 
+	// Number of occurrences of different levels of memory pressure
+	// events reported by memory cgroup. Pressure listening (re)starts
+	// with these values set to 0 when slave (re)starts. See
+  // https://www.kernel.org/doc/Documentation/cgroups/memory.txt for
+  // more details.
+  optional uint64 mem_low_pressure_counter = 32;
+  optional uint64 mem_medium_pressure_counter = 33;
+  optional uint64 mem_critical_pressure_counter = 34;
+
   // Disk Usage Information for executor working directory.
   optional uint64 disk_limit_bytes = 26;
   optional uint64 disk_used_bytes = 27;

http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 56ed9d9..9c01f5d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1417,6 +1417,7 @@ if OS_LINUX
   mesos_tests_SOURCES += tests/cgroups_isolator_tests.cpp
   mesos_tests_SOURCES += tests/cgroups_tests.cpp
   mesos_tests_SOURCES += tests/fs_tests.cpp
+  mesos_tests_SOURCES += tests/memory_pressure_tests.cpp
   mesos_tests_SOURCES += tests/ns_tests.cpp
   mesos_tests_SOURCES += tests/perf_tests.cpp
   mesos_tests_SOURCES += tests/setns_test_helper.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp
index eaeb301..a7a83ef 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.cpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.cpp
@@ -18,6 +18,7 @@
 
 #include <stdint.h>
 
+#include <list>
 #include <vector>
 
 #include <mesos/resources.hpp>
@@ -45,6 +46,9 @@
 
 using namespace process;
 
+using cgroups::memory::pressure::Level;
+using cgroups::memory::pressure::Counter;
+
 using std::list;
 using std::ostringstream;
 using std::set;
@@ -64,6 +68,13 @@ using mesos::slave::Limitation;
 template<class T>
 static Future<Option<T>> none() { return None(); }
 
+
+static const vector<Level> levels()
+{
+  return {Level::LOW, Level::MEDIUM, Level::CRITICAL};
+}
+
+
 CgroupsMemIsolatorProcess::CgroupsMemIsolatorProcess(
     const Flags& _flags,
     const string& _hierarchy,
@@ -110,6 +121,22 @@ Try<Isolator*> CgroupsMemIsolatorProcess::create(const Flags& flags)
     return Error(enable.error());
   }
 
+  // Test if memory pressure listening is enabled. We test that on the
+  // root cgroup. We rely on 'Counter::create' to test if memory
+  // pressure listening is enabled or not. The created counters will
+  // be destroyed immediately.
+  foreach (Level level, levels()) {
+    Try<Owned<Counter>> counter = Counter::create(
+        hierarchy.get(),
+        flags.cgroups_root,
+        level);
+
+    if (counter.isError()) {
+      return Error("Failed to listen on " + stringify(level) +
+                   " memory events: " + counter.error());
+    }
+  }
+
   // Determine whether to limit swap or not.
   bool limitSwap = false;
 
@@ -167,6 +194,7 @@ Future<Nothing> CgroupsMemIsolatorProcess::recover(
     cgroups.insert(cgroup);
 
     oomListen(containerId);
+    pressureListen(containerId);
   }
 
   Try<vector<string>> orphans = cgroups::get(
@@ -245,6 +273,7 @@ Future<Option<CommandInfo>> CgroupsMemIsolatorProcess::prepare(
   }
 
   oomListen(containerId);
+  pressureListen(containerId);
 
   return update(containerId, executorInfo.resources())
     .then(lambda::bind(none<CommandInfo>));
@@ -417,6 +446,59 @@ Future<ResourceStatistics> CgroupsMemIsolatorProcess::usage(
     result.set_mem_mapped_file_bytes(total_mapped_file.get());
   }
 
+  // Get pressure counter readings.
+  list<Level> levels;
+  list<Future<uint64_t>> values;
+  foreachpair (Level level,
+               const Owned<Counter>& counter,
+               info->pressureCounters) {
+    levels.push_back(level);
+    values.push_back(counter->value());
+  }
+
+  return await(values)
+    .then(defer(PID<CgroupsMemIsolatorProcess>(this),
+                &CgroupsMemIsolatorProcess::_usage,
+                containerId,
+                result,
+                levels,
+                lambda::_1));
+}
+
+
+Future<ResourceStatistics> CgroupsMemIsolatorProcess::_usage(
+    const ContainerID& containerId,
+    ResourceStatistics result,
+    const list<Level>& levels,
+    const list<Future<uint64_t>>& values)
+{
+  if (!infos.contains(containerId)) {
+    return Failure("Unknown container");
+  }
+
+  list<Level>::const_iterator iterator = levels.begin();
+  foreach (const Future<uint64_t>& value, values) {
+    if (value.isReady()) {
+      switch (*iterator) {
+        case Level::LOW:
+          result.set_mem_low_pressure_counter(value.get());
+          break;
+        case Level::MEDIUM:
+          result.set_mem_medium_pressure_counter(value.get());
+          break;
+        case Level::CRITICAL:
+          result.set_mem_critical_pressure_counter(value.get());
+          break;
+      }
+    } else {
+      LOG(ERROR) << "Failed to listen on " << stringify(*iterator)
+                 << " pressure events for container " << containerId << ": "
+                 << (value.isFailed() ? value.failure() : "discarded");
+    }
+
+    ++iterator;
+  }
+
   return result;
 }
 
@@ -580,6 +662,32 @@ void CgroupsMemIsolatorProcess::oom(const ContainerID& containerId)
   info->limitation.set(Limitation(mem, message.str()));
 }
 
+
+void CgroupsMemIsolatorProcess::pressureListen(
+    const ContainerID& containerId)
+{
+  CHECK(infos.contains(containerId));
+  Info* info = CHECK_NOTNULL(infos[containerId]);
+
+  foreach (Level level, levels()) {
+    Try<Owned<Counter>> counter = Counter::create(
+        hierarchy,
+        info->cgroup,
+        level);
+
+    if (counter.isError()) {
+      LOG(ERROR) << "Failed to listen on " << level << " memory pressure "
+                 << "events for container " << containerId << ": "
+                 << counter.error();
+    } else {
+      info->pressureCounters[level] = counter.get();
+
+      LOG(INFO) << "Started listening on " << level << " memory pressure "
+                << "events for container " << containerId;
+    }
+  }
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.hpp b/src/slave/containerizer/isolators/cgroups/mem.hpp
index a00f723..d510bc0 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.hpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.hpp
@@ -19,10 +19,16 @@
 #ifndef __MEM_ISOLATOR_HPP__
 #define __MEM_ISOLATOR_HPP__
 
+#include <list>
+
 #include <mesos/slave/isolator.hpp>
 
+#include <process/owned.hpp>
+
 #include <stout/hashmap.hpp>
 
+#include "linux/cgroups.hpp"
+
 #include "slave/flags.hpp"
 
 #include "slave/containerizer/isolators/cgroups/constants.hpp"
@@ -70,7 +76,13 @@ private:
       const std::string& hierarchy,
       bool limitSwap);
 
-  virtual process::Future<Nothing> _cleanup(
+  process::Future<ResourceStatistics> _usage(
+      const ContainerID& containerId,
+      ResourceStatistics result,
+      const std::list<cgroups::memory::pressure::Level>& levels,
+      const std::list<process::Future<uint64_t>>& values);
+
+  process::Future<Nothing> _cleanup(
       const ContainerID& containerId,
       const process::Future<Nothing>& future);
 
@@ -87,6 +99,10 @@ private:
 
     // Used to cancel the OOM listening.
     process::Future<Nothing> oomNotifier;
+
+    hashmap<cgroups::memory::pressure::Level,
+            process::Owned<cgroups::memory::pressure::Counter>>
+      pressureCounters;
   };
 
   // Start listening on OOM events. This function will create an
@@ -102,6 +118,9 @@ private:
   // This function is invoked when the OOM event happens.
   void oom(const ContainerID& containerId);
 
+  // Start listening on memory pressure events.
+  void pressureListen(const ContainerID& containerId);
+
   const Flags flags;
 
   // The path to the cgroups subsystem hierarchy root.

http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/tests/memory_pressure_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/memory_pressure_tests.cpp b/src/tests/memory_pressure_tests.cpp
new file mode 100644
index 0000000..e0b33ae
--- /dev/null
+++ b/src/tests/memory_pressure_tests.cpp
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <vector>
+
+#include <mesos/resources.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/gtest.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/os.hpp>
+
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::Slave;
+
+using std::vector;
+
+using testing::_;
+using testing::Eq;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MemoryPressureMesosTest : public ContainerizerTest<MesosContainerizer>
+{
+public:
+  static void SetUpTestCase()
+  {
+    // Verify that the dd command and its flags used in a bit are valid
+    // on this system.
+    ASSERT_EQ(0, os::system("dd count=1 bs=1M if=/dev/zero of=/dev/null"))
+      << "Cannot find a compatible 'dd' command";
+  }
+};
+
+
+TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // We only care about memory cgroup for this test.
+  flags.isolation = "cgroups/mem";
+  flags.slave_subsystems = None();
+
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer);
+
+  Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Offer offer = offers.get()[0];
+
+  // Run a task that triggers memory pressure event. We request 1G
+  // disk because we are going to write a 512 MB file repeatedly.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:256;disk:1024").get(),
+      "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId = *(containers.get().begin());
+
+  Duration waited = Duration::zero();
+  do {
+    Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+    AWAIT_READY(usage);
+
+    if (usage.get().mem_low_pressure_counter() > 0) {
+      EXPECT_GE(usage.get().mem_low_pressure_counter(),
+                usage.get().mem_medium_pressure_counter());
+      EXPECT_GE(usage.get().mem_medium_pressure_counter(),
+                usage.get().mem_critical_pressure_counter());
+      break;
+    }
+
+    os::sleep(Milliseconds(100));
+    waited += Milliseconds(100);
+  } while (waited < Seconds(5));
+
+  EXPECT_LE(waited, Seconds(5));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+  delete containerizer.get();
+}
+
+
+// Test that memory pressure listening is restarted after recovery.
+TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // We only care about memory cgroup for this test.
+  flags.isolation = "cgroups/mem";
+  flags.slave_subsystems = None();
+
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> containerizer1 =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer1);
+
+  Try<PID<Slave>> slave = StartSlave(containerizer1.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Offer offer = offers.get()[0];
+
+  // Run a task that triggers memory pressure event. We request 1G
+  // disk because we are going to write a 512 MB file repeatedly.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:256;disk:1024").get(),
+      "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // We restart the slave to let it recover.
+  Stop(slave.get());
+  delete containerizer1.get();
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Use the same flags.
+  Try<MesosContainerizer*> containerizer2 =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer2);
+
+  slave = StartSlave(containerizer2.get(), flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  AWAIT_READY(_recover);
+
+  // Wait for slave to schedule reregister timeout.
+  Clock::settle();
+
+  // Ensure the slave considers itself recovered.
+  Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT);
+
+  Clock::resume();
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Future<hashset<ContainerID>> containers = containerizer2.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId = *(containers.get().begin());
+
+  Duration waited = Duration::zero();
+  do {
+    Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId);
+    AWAIT_READY(usage);
+
+    if (usage.get().mem_low_pressure_counter() > 0) {
+      EXPECT_GE(usage.get().mem_low_pressure_counter(),
+                usage.get().mem_medium_pressure_counter());
+      EXPECT_GE(usage.get().mem_medium_pressure_counter(),
+                usage.get().mem_critical_pressure_counter());
+      break;
+    }
+
+    os::sleep(Milliseconds(100));
+    waited += Milliseconds(100);
+  } while (waited < Seconds(5));
+
+  EXPECT_LE(waited, Seconds(5));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+  delete containerizer2.get();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {