You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gi...@apache.org on 2018/05/11 07:55:03 UTC

[3/3] mesos git commit: Introduced wrapper for access to cgroups system access.

Introduced wrapper for access to cgroups system access.

Different cgroups subsystems are modelled as actors. In this patch we
introduce wrapper classes which `dispatch` to the processes. This
removes e.g., races from mixing naked and `dispatch`'ed method calls.

Review: https://reviews.apache.org/r/66635/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fc25a22c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fc25a22c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fc25a22c

Branch: refs/heads/master
Commit: fc25a22cbb18d8a044473cc2dc6a0bb6c8d2dbf2
Parents: 1168de4
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri May 11 00:54:32 2018 -0700
Committer: Gilbert Song <so...@gmail.com>
Committed: Fri May 11 00:54:32 2018 -0700

----------------------------------------------------------------------
 .../mesos/isolators/cgroups/cgroups.cpp         |  46 ++-----
 .../mesos/isolators/cgroups/cgroups.hpp         |   9 +-
 .../mesos/isolators/cgroups/subsystem.cpp       | 126 ++++++++++++++++++-
 .../mesos/isolators/cgroups/subsystem.hpp       |  79 ++++++++++--
 4 files changed, 207 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fc25a22c/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
index a03e0ad..6d663a5 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
@@ -57,7 +57,7 @@ namespace slave {
 CgroupsIsolatorProcess::CgroupsIsolatorProcess(
     const Flags& _flags,
     const hashmap<string, string>& _hierarchies,
-    const multihashmap<string, Owned<SubsystemProcess>>& _subsystems)
+    const multihashmap<string, Owned<Subsystem>>& _subsystems)
   : ProcessBase(process::ID::generate("cgroups-isolator")),
     flags(_flags),
     hierarchies(_hierarchies),
@@ -73,7 +73,7 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags)
   hashmap<string, string> hierarchies;
 
   // Hierarchy path -> subsystem object.
-  multihashmap<string, Owned<SubsystemProcess>> subsystems;
+  multihashmap<string, Owned<Subsystem>> subsystems;
 
   // Multimap: isolator name -> subsystem name.
   multihashmap<string, string> isolatorMap = {
@@ -124,8 +124,8 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags)
       }
 
       // Create and load the subsystem.
-      Try<Owned<SubsystemProcess>> subsystem =
-        SubsystemProcess::create(flags, subsystemName, hierarchy.get());
+      Try<Owned<Subsystem>> subsystem =
+        Subsystem::create(flags, subsystemName, hierarchy.get());
 
       if (subsystem.isError()) {
         return Error(
@@ -157,23 +157,6 @@ bool CgroupsIsolatorProcess::supportsStandalone()
 }
 
 
-void CgroupsIsolatorProcess::initialize()
-{
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
-    spawn(subsystem.get());
-  }
-}
-
-
-void CgroupsIsolatorProcess::finalize()
-{
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
-    terminate(subsystem.get());
-    wait(subsystem.get());
-  }
-}
-
-
 Future<Nothing> CgroupsIsolatorProcess::recover(
     const list<ContainerState>& states,
     const hashset<ContainerID>& orphans)
@@ -338,8 +321,7 @@ Future<Nothing> CgroupsIsolatorProcess::___recover(
       continue;
     }
 
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       recoveredSubsystems.insert(subsystem->name());
       recovers.push_back(subsystem->recover(containerId, cgroup));
     }
@@ -439,8 +421,7 @@ Future<Option<ContainerLaunchInfo>> CgroupsIsolatorProcess::prepare(
           "'" + path + "': " + create.error());
     }
 
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       infos[containerId]->subsystems.insert(subsystem->name());
       prepares.push_back(subsystem->prepare(
           containerId,
@@ -578,7 +559,7 @@ Future<Nothing> CgroupsIsolatorProcess::isolate(
   }
 
   list<Future<Nothing>> isolates;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     isolates.push_back(subsystem->isolate(
         containerId,
         infos[containerId]->cgroup,
@@ -629,7 +610,7 @@ Future<ContainerLimitation> CgroupsIsolatorProcess::watch(
     return Failure("Unknown container");
   }
 
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       subsystem->watch(containerId, infos[containerId]->cgroup)
         .onAny(defer(
@@ -671,7 +652,7 @@ Future<Nothing> CgroupsIsolatorProcess::update(
   }
 
   list<Future<Nothing>> updates;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       updates.push_back(subsystem->update(
           containerId,
@@ -722,7 +703,7 @@ Future<ResourceStatistics> CgroupsIsolatorProcess::usage(
   }
 
   list<Future<ResourceStatistics>> usages;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       usages.push_back(subsystem->usage(
           containerId,
@@ -765,7 +746,7 @@ Future<ContainerStatus> CgroupsIsolatorProcess::status(
   }
 
   list<Future<ContainerStatus>> statuses;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       statuses.push_back(subsystem->status(
           containerId,
@@ -808,7 +789,7 @@ Future<Nothing> CgroupsIsolatorProcess::cleanup(
   }
 
   list<Future<Nothing>> cleanups;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       cleanups.push_back(subsystem->cleanup(
           containerId,
@@ -850,8 +831,7 @@ Future<Nothing> CgroupsIsolatorProcess::_cleanup(
 
   // TODO(haosdent): Use foreachkey once MESOS-5037 is resolved.
   foreach (const string& hierarchy, subsystems.keys()) {
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       if (infos[containerId]->subsystems.contains(subsystem->name())) {
         destroys.push_back(cgroups::destroy(
             hierarchy,

http://git-wip-us.apache.org/repos/asf/mesos/blob/fc25a22c/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
index 9dcd5da..f47b16e 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
@@ -81,11 +81,6 @@ public:
   virtual process::Future<Nothing> cleanup(
       const ContainerID& containerId);
 
-protected:
-  virtual void initialize();
-
-  virtual void finalize();
-
 private:
   struct Info
   {
@@ -107,7 +102,7 @@ private:
   CgroupsIsolatorProcess(
       const Flags& _flags,
       const hashmap<std::string, std::string>& _hierarchies,
-      const multihashmap<std::string, process::Owned<SubsystemProcess>>&
+      const multihashmap<std::string, process::Owned<Subsystem>>&
         _subsystems);
 
   process::Future<Nothing> _recover(
@@ -162,7 +157,7 @@ private:
   //   /cgroup/memory      -> memory
   // As we see, subsystem 'cpu' and 'cpuacct' are co-mounted at
   // '/cgroup/cpu,cpuacct'.
-  multihashmap<std::string, process::Owned<SubsystemProcess>> subsystems;
+  multihashmap<std::string, process::Owned<Subsystem>> subsystems;
 
   // Store cgroups associated information for containers.
   hashmap<ContainerID, process::Owned<Info>> infos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/fc25a22c/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
index 1ba6ec4..dc6c7aa 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <utility>
+
 #include <stout/error.hpp>
 #include <stout/hashmap.hpp>
 
@@ -43,7 +45,27 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-Try<Owned<SubsystemProcess>> SubsystemProcess::create(
+Subsystem::Subsystem(Owned<SubsystemProcess> _process)
+  : process(std::move(_process))
+{
+  process::spawn(process.get());
+}
+
+
+Subsystem::~Subsystem()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+string Subsystem::name() const
+{
+  return process->name();
+}
+
+
+Try<Owned<Subsystem>> Subsystem::create(
     const Flags& flags,
     const string& name,
     const string& hierarchy)
@@ -76,7 +98,107 @@ Try<Owned<SubsystemProcess>> SubsystemProcess::create(
         subsystemProcess.error());
   }
 
-  return subsystemProcess.get();
+  return Owned<Subsystem>(new Subsystem(subsystemProcess.get()));
+}
+
+
+Future<Nothing> Subsystem::recover(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::recover,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::prepare(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::prepare,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::isolate(
+    const ContainerID& containerId,
+    const string& cgroup,
+    pid_t pid)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::isolate,
+      containerId,
+      cgroup,
+      pid);
+}
+
+
+Future<mesos::slave::ContainerLimitation> Subsystem::watch(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::watch,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::update(
+    const ContainerID& containerId,
+    const string& cgroup,
+    const Resources& resources)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::update,
+      containerId,
+      cgroup,
+      resources);
+}
+
+
+Future<ResourceStatistics> Subsystem::usage(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::usage,
+      containerId,
+      cgroup);
+}
+
+
+Future<ContainerStatus> Subsystem::status(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::status,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::cleanup(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::cleanup,
+      containerId,
+      cgroup);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/fc25a22c/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
index c99a00c..5ae8253 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
@@ -36,10 +36,12 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
+class SubsystemProcess;
+
 /**
  * An abstraction for cgroups subsystem.
  */
-class SubsystemProcess : public process::Process<SubsystemProcess>
+class Subsystem
 {
 public:
   /**
@@ -51,19 +53,26 @@ public:
    * @param hierarchy The hierarchy path of cgroups subsystem.
    * @return A specific `Subsystem` object or an error if `create` fails.
    */
-  static Try<process::Owned<SubsystemProcess>> create(
+  static Try<process::Owned<Subsystem>> create(
       const Flags& flags,
       const std::string& name,
       const std::string& hierarchy);
 
-  virtual ~SubsystemProcess() {}
+  // We have unique ownership of the wrapped process and
+  // enforce that objects of this class cannot be copied.
+  //
+  // TODO(bbannier): Remove this once MESOS-5122 is resolved.
+  Subsystem(const Subsystem&) = delete;
+  Subsystem& operator=(const Subsystem&) = delete;
+
+  ~Subsystem();
 
   /**
    * The cgroups subsystem name of this `Subsystem` object.
    *
    * @return The cgroups subsystem name.
    */
-  virtual std::string name() const = 0;
+  std::string name() const;
 
   /**
    * Recover the cgroups subsystem for the associated container.
@@ -72,7 +81,7 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `recover` fails.
    */
-  virtual process::Future<Nothing> recover(
+  process::Future<Nothing> recover(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -83,7 +92,7 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `prepare` fails.
    */
-  virtual process::Future<Nothing> prepare(
+  process::Future<Nothing> prepare(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -95,7 +104,7 @@ public:
    * @param pid The process id of container.
    * @return Nothing or an error if `isolate` fails.
    */
-  virtual process::Future<Nothing> isolate(
+  process::Future<Nothing> isolate(
       const ContainerID& containerId,
       const std::string& cgroup,
       pid_t pid);
@@ -108,7 +117,7 @@ public:
    * @return The resource limitation that impacts the container or an
    *     error if `watch` fails.
    */
-  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+  process::Future<mesos::slave::ContainerLimitation> watch(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -121,7 +130,7 @@ public:
    * @param resources The resources need to update.
    * @return Nothing or an error if `update` fails.
    */
-  virtual process::Future<Nothing> update(
+  process::Future<Nothing> update(
       const ContainerID& containerId,
       const std::string& cgroup,
       const Resources& resources);
@@ -135,7 +144,7 @@ public:
    * @return The resource usage statistics or an error if gather statistics
    *     fails.
    */
-  virtual process::Future<ResourceStatistics> usage(
+  process::Future<ResourceStatistics> usage(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -147,7 +156,7 @@ public:
    * @param cgroup The target cgroup.
    * @return The container status or an error if get fails.
    */
-  virtual process::Future<ContainerStatus> status(
+  process::Future<ContainerStatus> status(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -163,6 +172,54 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `cleanup` fails.
    */
+  process::Future<Nothing> cleanup(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+private:
+  explicit Subsystem(process::Owned<SubsystemProcess> process);
+
+  process::Owned<SubsystemProcess> process;
+};
+
+
+class SubsystemProcess : public process::Process<SubsystemProcess>
+{
+public:
+  virtual ~SubsystemProcess() {}
+
+  virtual std::string name() const = 0;
+
+  virtual process::Future<Nothing> recover(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> prepare(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      pid_t pid);
+
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<ContainerStatus> status(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
   virtual process::Future<Nothing> cleanup(
       const ContainerID& containerId,
       const std::string& cgroup);