You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/04 18:28:17 UTC

[1/4] git commit: Added a helper for determining the cgroup of particular pid.

Repository: mesos
Updated Branches:
  refs/heads/master 8952b5950 -> 46f1aebde


Added a helper for determining the cgroup of particular pid.

These helpers perform the non-trivial steps required to determine the
cgroup of a particular pid for a particular hierarchy where a
particular subsystem is attached (in this case, we've provded
implementations for the 'cpu' subsystem hierarchy and 'memory'
subsystem hierarchy, which may be the same depending on how cgroups
are mounted).

Review: https://reviews.apache.org/r/23632


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46f1aebd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46f1aebd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46f1aebd

Branch: refs/heads/master
Commit: 46f1aebde0905cd2320754fc8a90244c3ab0daeb
Parents: 29910a6
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 9 11:26:46 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 09:15:50 2014 -0700

----------------------------------------------------------------------
 src/linux/cgroups.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++
 src/linux/cgroups.hpp | 18 ++++++++++++++++
 2 files changed, 72 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/46f1aebd/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index af6be22..ccb86cf 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1783,8 +1783,56 @@ Try<hashmap<string, uint64_t> > stat(
 }
 
 
+namespace internal {
+
+// Helper for finding the cgroup of the specified pid for the
+// specified subsystem.
+Result<string> cgroup(pid_t pid, const string& subsystem)
+{
+  // Determine cgroup for hierarchy with the subsystem attached.
+  string path = path::join("/proc", stringify(pid), "cgroup");
+
+  Try<string> read = os::read(path);
+
+  if (read.isError()) {
+    return Error("Failed to read " + path + ": " + read.error());
+  }
+
+  // Now determine the cgroup by parsing each line of the output which
+  // should be of the form "N:subsystems:cgroup" where 'N' is the
+  // hierarchy number and 'subsystems' are the attached subsystems and
+  // 'cgroup' is the relative path to the cgroup from the hierarchy
+  // path.
+  Option<string> cgroup = None();
+
+  foreach (const string& line, strings::tokenize(read.get(), "\n")) {
+    vector<string> tokens = strings::tokenize(line, ":");
+
+    if (tokens.size() != 3) {
+      return Error("Unexpected format in " + path);
+    }
+
+    foreach (const string& token, strings::tokenize(tokens[1], ",")) {
+      if (subsystem == token) {
+        cgroup = tokens[2];
+      }
+    }
+  }
+
+  return cgroup;
+}
+
+} // namespace internal {
+
+
 namespace cpu {
 
+Result<string> cgroup(pid_t pid)
+{
+  return internal::cgroup(pid, "cpu");
+}
+
+
 Try<Nothing> shares(
     const string& hierarchy,
     const string& cgroup,
@@ -1842,6 +1890,12 @@ Try<Nothing> cfs_quota_us(
 
 namespace memory {
 
+Result<string> cgroup(pid_t pid)
+{
+  return internal::cgroup(pid, "memory");
+}
+
+
 Try<Bytes> limit_in_bytes(const string& hierarchy, const string& cgroup)
 {
   Try<string> read = cgroups::read(

http://git-wip-us.apache.org/repos/asf/mesos/blob/46f1aebd/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index decad9d..c571e91 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -380,6 +380,12 @@ Try<hashmap<std::string, uint64_t> > stat(
 // Cpu controls.
 namespace cpu {
 
+// Returns the cgroup that the specified pid is a member of within the
+// hierarchy that the 'cpu' subsytem is mounted or None if the
+// subsystem is not mounted or the pid is not a member of a cgroup.
+Result<std::string> cgroup(pid_t pid);
+
+
 // Sets the cpu shares using cpu.shares.
 Try<Nothing> shares(
     const std::string& hierarchy,
@@ -387,6 +393,12 @@ Try<Nothing> shares(
     uint64_t shares);
 
 
+// Returns the cpu shares from cpu.shares.
+Try<uint64_t> shares(
+    const std::string& hierarchy,
+    const std::string& cgroup);
+
+
 // Sets the cfs period using cpu.cfs_period_us.
 Try<Nothing> cfs_period_us(
     const std::string& hierarchy,
@@ -412,6 +424,12 @@ Try<Nothing> cfs_quota_us(
 // Memory controls.
 namespace memory {
 
+// Returns the cgroup that the specified pid is a member of within the
+// hierarchy that the 'memory' subsytem is mounted or None if the
+// subsystem is not mounted or the pid is not a member of a cgroup.
+Result<std::string> cgroup(pid_t pid);
+
+
 // Returns the memory limit from memory.limit_in_bytes.
 Try<Bytes> limit_in_bytes(
     const std::string& hierarchy,


[2/4] git commit: Added 'bool' return value to Containerizer::launch.

Posted by be...@apache.org.
Added 'bool' return value to Containerizer::launch.

Review: https://reviews.apache.org/r/23461


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1ae6ade3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1ae6ade3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1ae6ade3

Branch: refs/heads/master
Commit: 1ae6ade3d4c5c52794be11422f731959a581ac9d
Parents: 6ddca90
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Jun 22 17:53:22 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 09:15:50 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/containerizer.hpp       | 12 +++++---
 .../containerizer/external_containerizer.cpp    | 12 ++++----
 .../containerizer/external_containerizer.hpp    | 10 +++----
 src/slave/containerizer/mesos/containerizer.cpp | 30 ++++++++++++--------
 src/slave/containerizer/mesos/containerizer.hpp | 14 ++++-----
 src/slave/slave.cpp                             | 10 +++++--
 src/slave/slave.hpp                             |  2 +-
 src/tests/containerizer.cpp                     |  6 ++--
 src/tests/containerizer.hpp                     |  6 ++--
 src/tests/containerizer_tests.cpp               |  8 +++---
 src/tests/external_containerizer_test.cpp       |  4 +--
 src/tests/slave_recovery_tests.cpp              |  2 +-
 src/tests/slave_tests.cpp                       |  2 +-
 13 files changed, 67 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index a9f89fc..02754cd 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -72,8 +72,10 @@ public:
   virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state) = 0;
 
-  // Launch a containerized executor.
-  virtual process::Future<Nothing> launch(
+  // Launch a containerized executor. Returns true if launching this
+  // ExecutorInfo is supported and it has been launched, otherwise
+  // false or a failure is something went wrong.
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -82,10 +84,12 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint) = 0;
 
-  // Launch a containerized task.
+  // Launch a containerized task. Returns true if launching this
+  // TaskInfo/ExecutorInfo is supported and it has been launched,
+  // otherwise false or a failure is something went wrong.
   // TODO(nnielsen): Obsolete the executorInfo argument when the slave
   // doesn't require executors to run standalone tasks.
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index 3f28d85..03de1fc 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -153,7 +153,7 @@ Future<Nothing> ExternalContainerizer::recover(
 }
 
 
-Future<Nothing> ExternalContainerizer::launch(
+Future<bool> ExternalContainerizer::launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
@@ -175,7 +175,7 @@ Future<Nothing> ExternalContainerizer::launch(
 }
 
 
-Future<Nothing> ExternalContainerizer::launch(
+Future<bool> ExternalContainerizer::launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
@@ -418,7 +418,7 @@ Future<Nothing> ExternalContainerizerProcess::___recover()
 }
 
 
-Future<Nothing> ExternalContainerizerProcess::launch(
+Future<bool> ExternalContainerizerProcess::launch(
     const ContainerID& containerId,
     const Option<TaskInfo>& taskInfo,
     const ExecutorInfo& executor,
@@ -527,7 +527,7 @@ Future<Nothing> ExternalContainerizerProcess::launch(
 }
 
 
-Future<Nothing> ExternalContainerizerProcess::_launch(
+Future<bool> ExternalContainerizerProcess::_launch(
     const ContainerID& containerId,
     const Future<Option<int> >& future)
 {
@@ -546,13 +546,13 @@ Future<Nothing> ExternalContainerizerProcess::_launch(
   // have gotten chained up.
   actives[containerId]->launched.set(Nothing());
 
-  return Nothing();
+  return true;
 }
 
 
 void ExternalContainerizerProcess::__launch(
     const ContainerID& containerId,
-    const Future<Nothing>& future)
+    const Future<bool>& future)
 {
   VLOG(1) << "Launch confirmation callback triggered on container '"
           << containerId << "'";

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
index 94dffbb..b32efdd 100644
--- a/src/slave/containerizer/external_containerizer.hpp
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -81,7 +81,7 @@ public:
   virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -90,7 +90,7 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint);
 
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& task,
       const ExecutorInfo& executorInfo,
@@ -130,7 +130,7 @@ public:
   process::Future<Nothing> recover(const Option<state::SlaveState>& state);
 
   // Start the containerized executor.
-  process::Future<Nothing> launch(
+  process::Future<bool> launch(
       const ContainerID& containerId,
       const Option<TaskInfo>& taskInfo,
       const ExecutorInfo& executorInfo,
@@ -219,13 +219,13 @@ private:
 
   process::Future<Nothing> ___recover();
 
-  process::Future<Nothing> _launch(
+  process::Future<bool> _launch(
       const ContainerID& containerId,
       const process::Future<Option<int> >& future);
 
   void __launch(
       const ContainerID& containerId,
-      const process::Future<Nothing>& future);
+      const process::Future<bool>& future);
 
   process::Future<containerizer::Termination> _wait(
       const ContainerID& containerId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 2c394e2..694c9d1 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -124,7 +124,7 @@ Try<MesosContainerizer*> MesosContainerizer::create(
   LOG(INFO) << "Using isolation: " << isolation;
 
   // Create a MesosContainerizerProcess using isolators and a launcher.
-  hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
+  hashmap<string, Try<Isolator*> (*)(const Flags&)> creators;
 
   creators["posix/cpu"]   = &PosixCpuIsolatorProcess::create;
   creators["posix/mem"]   = &PosixMemIsolatorProcess::create;
@@ -200,7 +200,7 @@ Future<Nothing> MesosContainerizer::recover(
 }
 
 
-Future<Nothing> MesosContainerizer::launch(
+Future<bool> MesosContainerizer::launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
@@ -221,7 +221,7 @@ Future<Nothing> MesosContainerizer::launch(
 }
 
 
-Future<Nothing> MesosContainerizer::launch(
+Future<bool> MesosContainerizer::launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
@@ -391,7 +391,7 @@ Future<Nothing> MesosContainerizerProcess::__recover(
 // 4. Exec the executor. The forked child is signalled to continue. It will
 //    first execute any preparation commands from isolators and then exec the
 //    executor.
-Future<Nothing> MesosContainerizerProcess::launch(
+Future<bool> MesosContainerizerProcess::launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
@@ -411,10 +411,10 @@ Future<Nothing> MesosContainerizerProcess::launch(
   // run.
   const CommandInfo& command = executorInfo.command();
   if (command.has_container()) {
-    // We return a Failure as this containerizer does not support
+    // We return false as this containerizer does not support
     // handling ContainerInfo. Users have to be made aware of this
     // lack of support to prevent confusion in the task configuration.
-    return Failure("ContainerInfo is not supported");
+    return false;
   }
 
   Owned<Promise<containerizer::Termination> > promise(
@@ -445,7 +445,7 @@ Future<Nothing> MesosContainerizerProcess::launch(
 }
 
 
-Future<Nothing> MesosContainerizerProcess::launch(
+Future<bool> MesosContainerizerProcess::launch(
     const ContainerID& containerId,
     const TaskInfo&,
     const ExecutorInfo& executorInfo,
@@ -616,7 +616,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
 }
 
 
-Future<Nothing> MesosContainerizerProcess::_launch(
+Future<bool> MesosContainerizerProcess::_launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
@@ -735,7 +735,13 @@ Future<Nothing> MesosContainerizerProcess::_launch(
 }
 
 
-Future<Nothing> MesosContainerizerProcess::isolate(
+static Future<bool> _isolate()
+{
+  return true;
+}
+
+
+Future<bool> MesosContainerizerProcess::isolate(
     const ContainerID& containerId,
     pid_t _pid)
 {
@@ -753,11 +759,11 @@ Future<Nothing> MesosContainerizerProcess::isolate(
 
   // Wait for all isolators to complete.
   return collect(futures)
-    .then(lambda::bind(&_nothing));
+    .then(lambda::bind(&_isolate));
 }
 
 
-Future<Nothing> MesosContainerizerProcess::exec(
+Future<bool> MesosContainerizerProcess::exec(
     const ContainerID& containerId,
     int pipeWrite)
 {
@@ -779,7 +785,7 @@ Future<Nothing> MesosContainerizerProcess::exec(
                    string(strerror(errno)));
   }
 
-  return Nothing();
+  return true;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 8746968..bf246ca 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -52,7 +52,7 @@ public:
   virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -61,7 +61,7 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint);
 
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,
@@ -109,7 +109,7 @@ public:
   process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  process::Future<Nothing> launch(
+  process::Future<bool> launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -118,7 +118,7 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint);
 
-  process::Future<Nothing> launch(
+  process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,
@@ -161,7 +161,7 @@ private:
       const std::string& directory,
       const Option<std::string>& user);
 
-  process::Future<Nothing> _launch(
+  process::Future<bool> _launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -171,11 +171,11 @@ private:
       bool checkpoint,
       const std::list<Option<CommandInfo> >& scripts);
 
-  process::Future<Nothing> isolate(
+  process::Future<bool> isolate(
       const ContainerID& containerId,
       pid_t _pid);
 
-  process::Future<Nothing> exec(
+  process::Future<bool> exec(
       const ContainerID& containerId,
       int pipeWrite);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b4a5a45..df69b75 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2418,7 +2418,7 @@ void Slave::executorLaunched(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
     const ContainerID& containerId,
-    const Future<Nothing>& future)
+    const Future<bool>& future)
 {
   // Set up callback for executor termination. Note that we do this
   // regardless of whether or not we have successfully launched the
@@ -2444,6 +2444,12 @@ void Slave::executorLaunched(
                << "' failed to start: "
                << (future.isFailed() ? future.failure() : " future discarded");
     return;
+  } else if (future.get()) {
+    LOG(ERROR) << "Container '" << containerId
+               << "' for executor '" << executorId
+               << "' of framework '" << frameworkId
+               << "' failed to start: TaskInfo/ExecutorInfo not supported";
+    return;
   }
 
   Framework* framework = getFramework(frameworkId);
@@ -3490,7 +3496,7 @@ Executor* Framework::launchExecutor(
   }
 
   // Launch the container.
-  Future<Nothing> launch;
+  Future<bool> launch;
   if (!executor->commandExecutor) {
     // If the executor is _not_ a command executor, this means that
     // the task will include the executor to run. The actual task to

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a896bb6..9ef597e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -199,7 +199,7 @@ public:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
       const ContainerID& containerId,
-      const process::Future<Nothing>& future);
+      const process::Future<bool>& future);
 
   void executorTerminated(
       const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index 3f11d35..a17e1e0 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -73,7 +73,7 @@ TestContainerizer::~TestContainerizer()
 }
 
 
-Future<Nothing> TestContainerizer::_launch(
+Future<bool> TestContainerizer::_launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
     const string& directory,
@@ -137,11 +137,11 @@ Future<Nothing> TestContainerizer::_launch(
       new Promise<containerizer::Termination>());
   promises[containerId] = promise;
 
-  return Nothing();
+  return true;
 }
 
 
-Future<Nothing> TestContainerizer::launch(
+Future<bool> TestContainerizer::launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.hpp b/src/tests/containerizer.hpp
index 9325864..24b014f 100644
--- a/src/tests/containerizer.hpp
+++ b/src/tests/containerizer.hpp
@@ -66,7 +66,7 @@ public:
 
   MOCK_METHOD7(
       launch,
-      process::Future<Nothing>(
+      process::Future<bool>(
           const ContainerID&,
           const ExecutorInfo&,
           const std::string&,
@@ -75,7 +75,7 @@ public:
           const process::PID<slave::Slave>&,
           bool checkpoint));
 
-  virtual process::Future<Nothing> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,
@@ -113,7 +113,7 @@ private:
   void setup();
 
   // Default 'launch' implementation.
-  process::Future<Nothing> _launch(
+  process::Future<bool> _launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 70e1245..2c90d2f 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -312,7 +312,7 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptSucceeds)
   ContainerID containerId;
   containerId.set_value("test_container");
 
-  process::Future<Nothing> launch = containerizer.get()->launch(
+  process::Future<bool> launch = containerizer.get()->launch(
       containerId,
       CREATE_EXECUTOR_INFO("executor", "exit 0"),
       directory,
@@ -356,7 +356,7 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptFails)
   ContainerID containerId;
   containerId.set_value("test_container");
 
-  Future<Nothing> launch = containerizer.get()->launch(
+  Future<bool> launch = containerizer.get()->launch(
       containerId,
       CREATE_EXECUTOR_INFO("executor", "exit 0"),
       directory,
@@ -409,7 +409,7 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, MultipleScripts)
   ContainerID containerId;
   containerId.set_value("test_container");
 
-  Future<Nothing> launch = containerizer.get()->launch(
+  Future<bool> launch = containerizer.get()->launch(
       containerId,
       CREATE_EXECUTOR_INFO("executor", "exit 0"),
       directory,
@@ -462,7 +462,7 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
   string command =
     "(echo '" + errMsg + "' 1>&2) && echo '" + outMsg + "'";
 
-  process::Future<Nothing> launch = containerizer.get()->launch(
+  process::Future<bool> launch = containerizer.get()->launch(
       containerId,
       CREATE_EXECUTOR_INFO("executor", command),
       directory,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/external_containerizer_test.cpp
----------------------------------------------------------------------
diff --git a/src/tests/external_containerizer_test.cpp b/src/tests/external_containerizer_test.cpp
index c26f3c2..2f6aa50 100644
--- a/src/tests/external_containerizer_test.cpp
+++ b/src/tests/external_containerizer_test.cpp
@@ -77,7 +77,7 @@ class MockExternalContainerizer : public slave::ExternalContainerizer
 public:
   MOCK_METHOD8(
       launch,
-      process::Future<Nothing>(
+      process::Future<bool>(
           const ContainerID&,
           const TaskInfo&,
           const ExecutorInfo&,
@@ -98,7 +98,7 @@ public:
       .WillRepeatedly(Invoke(this, &MockExternalContainerizer::_launch));
   }
 
-  process::Future<Nothing> _launch(
+  process::Future<bool> _launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0b17a8a..9a14b4c 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -3209,7 +3209,7 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch)
   Future<Nothing> launch;
   EXPECT_CALL(*containerizer1, launch(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&launch),
-                    Return(Future<Nothing>())));
+                    Return(Future<bool>())));
 
   // Ensure that wait doesn't complete so that containerizer doesn't
   // return a failure on 'wait' due to the pending launch.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1ae6ade3/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e45255a..b4f9f30 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -303,7 +303,7 @@ TEST_F(SlaveTest, MesosExecutorWithOverride)
   Future<Nothing> launch;
   EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&launch),
-                    Return(Future<Nothing>())));
+                    Return(Future<bool>())));
 
   // Expect wait after launch is called. wait() will fail if not
   // intercepted here as the container will never be registered within


[3/4] git commit: Added composing containerizer and --containerizers flag.

Posted by be...@apache.org.
Added composing containerizer and --containerizers flag.

Review: https://reviews.apache.org/r/23462


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29910a6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29910a6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29910a6e

Branch: refs/heads/master
Commit: 29910a6e27b8e9374e323c05ffc63659b0180388
Parents: 1ae6ade
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Jun 22 17:54:53 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 09:15:50 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                           |   2 +
 src/slave/containerizer/composing.cpp     | 545 +++++++++++++++++++++++++
 src/slave/containerizer/composing.hpp     | 100 +++++
 src/slave/containerizer/containerizer.cpp |  39 +-
 src/slave/flags.hpp                       |  11 +
 5 files changed, 693 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index c7ed168..0d9e3f0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -261,6 +261,8 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/state.cpp							\
 	slave/slave.cpp							\
 	slave/containerizer/containerizer.cpp				\
+	slave/containerizer/composing.cpp				\
+	slave/containerizer/composing.hpp				\
 	slave/containerizer/external_containerizer.cpp			\
 	slave/containerizer/isolator.cpp				\
 	slave/containerizer/launcher.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/composing.cpp b/src/slave/containerizer/composing.cpp
new file mode 100644
index 0000000..9b36d91
--- /dev/null
+++ b/src/slave/containerizer/composing.cpp
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+#include <vector>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+
+#include "slave/state.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/composing.hpp"
+
+using std::list;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+
+class ComposingContainerizerProcess
+  : public Process<ComposingContainerizerProcess>
+{
+public:
+  ComposingContainerizerProcess(
+      const vector<Containerizer*>& containerizers);
+
+  virtual ~ComposingContainerizerProcess();
+
+  Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  Future<bool> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  Future<bool> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  void destroy(const ContainerID& containerId);
+
+  Future<hashset<ContainerID> > containers();
+
+private:
+  // Continuations.
+  Future<Nothing> _recover();
+  Future<Nothing> __recover(
+      Containerizer* containerizer,
+      const hashset<ContainerID>& containers);
+  static Future<Nothing> ___recover();
+
+  Future<bool> _launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint,
+      vector<Containerizer*>::iterator containerizer,
+      bool launched);
+
+  Future<bool> _launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint,
+      vector<Containerizer*>::iterator containerizer,
+      bool launched);
+
+  vector<Containerizer*> containerizers_;
+  hashmap<Containerizer*, hashset<ContainerID> > containers_;
+};
+
+
+Try<ComposingContainerizer*> ComposingContainerizer::create(
+    const vector<Containerizer*>& containerizers)
+{
+  return new ComposingContainerizer(containerizers);
+}
+
+
+ComposingContainerizer::ComposingContainerizer(
+    const vector<Containerizer*>& containerizers)
+{
+  process = new ComposingContainerizerProcess(containerizers);
+  spawn(process);
+}
+
+
+ComposingContainerizer::~ComposingContainerizer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Nothing> ComposingContainerizer::recover(
+    const Option<state::SlaveState>& state)
+{
+  return dispatch(process, &ComposingContainerizerProcess::recover, state);
+}
+
+
+Future<bool> ComposingContainerizer::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::launch,
+                  containerId,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<bool> ComposingContainerizer::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::launch,
+                  containerId,
+                  taskInfo,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> ComposingContainerizer::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  return dispatch(process,
+                  &ComposingContainerizerProcess::update,
+                  containerId,
+                  resources);
+}
+
+
+Future<ResourceStatistics> ComposingContainerizer::usage(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ComposingContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> ComposingContainerizer::wait(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ComposingContainerizerProcess::wait, containerId);
+}
+
+
+void ComposingContainerizer::destroy(const ContainerID& containerId)
+{
+  dispatch(process, &ComposingContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > ComposingContainerizer::containers()
+{
+  return dispatch(process, &ComposingContainerizerProcess::containers);
+}
+
+
+ComposingContainerizerProcess::ComposingContainerizerProcess(
+    const vector<Containerizer*>& containerizers)
+  : containerizers_(containerizers)
+{
+  foreach (Containerizer* containerizer, containerizers_) {
+    containers_[containerizer] = hashset<ContainerID>();
+  }
+}
+
+
+ComposingContainerizerProcess::~ComposingContainerizerProcess()
+{
+  foreach (Containerizer* containerizer, containerizers_) {
+    delete containerizer;
+  }
+  containerizers_.clear();
+  containers_.clear();
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::recover(
+    const Option<state::SlaveState>& state)
+{
+  // Recover each containerizer in parallel.
+  list<Future<Nothing> > futures;
+  foreach (Containerizer* containerizer, containerizers_) {
+    futures.push_back(containerizer->recover(state));
+  }
+
+  return collect(futures)
+    .then(defer(self(), &Self::_recover));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::_recover()
+{
+  // Now collect all the running containers in order to multiplex.
+  list<Future<Nothing> > futures;
+  foreach (Containerizer* containerizer, containerizers_) {
+    Future<Nothing> future = containerizer->containers()
+      .then(defer(self(), &Self::__recover, containerizer, lambda::_1));
+    futures.push_back(future);
+  }
+
+  return collect(futures)
+    .then(lambda::bind(&Self::___recover));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::__recover(
+    Containerizer* containerizer,
+    const hashset<ContainerID>& containers)
+{
+  containers_[containerizer] = containers;
+  return Nothing();
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::___recover()
+{
+  return Nothing();
+}
+
+
+Future<bool> ComposingContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  // Try each containerizer. If none of them handle the
+  // TaskInfo/ExecutorInfo then return a Failure.
+  vector<Containerizer*>::iterator containerizer = containerizers_.begin();
+
+  return (*containerizer)->launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<bool> ComposingContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    vector<Containerizer*>::iterator containerizer,
+    bool launched)
+{
+  if (launched) {
+    containers_[*containerizer].insert(containerId);
+    return true;
+  }
+
+  // Try the next containerizer.
+  ++containerizer;
+
+  if (containerizer == containerizers_.end()) {
+    return false;
+  }
+
+  return (*containerizer)->launch(
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<bool> ComposingContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  // Try each containerizer. If none of them handle the
+  // TaskInfo/ExecutorInfo then return a Failure.
+  vector<Containerizer*>::iterator containerizer = containerizers_.begin();
+
+  return (*containerizer)->launch(
+      containerId,
+      taskInfo,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                taskInfo,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+Future<bool> ComposingContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint,
+    vector<Containerizer*>::iterator containerizer,
+    bool launched)
+{
+  if (launched) {
+    containers_[*containerizer].insert(containerId);
+    return true;
+  }
+
+  // Try the next containerizer.
+  ++containerizer;
+
+  if (containerizer == containerizers_.end()) {
+    return false;
+  }
+
+  return (*containerizer)->launch(
+      containerId,
+      taskInfo,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                taskInfo,
+                executorInfo,
+                directory,
+                user,
+                slaveId,
+                slavePid,
+                checkpoint,
+                containerizer,
+                lambda::_1));
+}
+
+
+Future<Nothing> ComposingContainerizerProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->update(containerId, resources);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+Future<ResourceStatistics> ComposingContainerizerProcess::usage(
+    const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->usage(containerId);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+Future<containerizer::Termination> ComposingContainerizerProcess::wait(
+    const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      return containerizer->wait(containerId);
+    }
+  }
+
+  return Failure("No container found");
+}
+
+
+void ComposingContainerizerProcess::destroy(const ContainerID& containerId)
+{
+  foreachpair (Containerizer* containerizer,
+               const hashset<ContainerID>& containers,
+               containers_) {
+    if (containers.contains(containerId)) {
+      containerizer->destroy(containerId);
+      break;
+    }
+  }
+}
+
+
+// TODO(benh): Move into stout/hashset.hpp
+template <typename Elem>
+hashset<Elem> merge(const std::list<hashset<Elem> >& list)
+{
+  hashset<Elem> result;
+  foreach (const hashset<Elem>& set, list) {
+    result.insert(set.begin(), set.end());
+  }
+  return result;
+}
+
+
+Future<hashset<ContainerID> > ComposingContainerizerProcess::containers()
+{
+  return merge(containers_.values());
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/composing.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/composing.hpp b/src/slave/containerizer/composing.hpp
new file mode 100644
index 0000000..f1e60b0
--- /dev/null
+++ b/src/slave/containerizer/composing.hpp
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMPOSING_CONTAINERIZER_HPP__
+#define __COMPOSING_CONTAINERIZER_HPP__
+
+#include <map>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <mesos/containerizer/containerizer.hpp>
+
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class ComposingContainerizerProcess;
+
+class ComposingContainerizer : public Containerizer
+{
+public:
+  static Try<ComposingContainerizer*> create(
+      const std::vector<Containerizer*>& containerizers);
+
+  ComposingContainerizer(
+      const std::vector<Containerizer*>& containerizers);
+
+  virtual ~ComposingContainerizer();
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(const ContainerID& containerId);
+
+  virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+  ComposingContainerizerProcess* process;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __COMPOSING_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 1b71f33..d62d25d 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -34,6 +34,7 @@
 #ifdef __linux__
 #include "slave/containerizer/linux_launcher.hpp"
 #endif // __linux__
+#include "slave/containerizer/composing.hpp"
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
@@ -153,12 +154,42 @@ Try<Resources> Containerizer::resources(const Flags& flags)
 
 Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
 {
-  if (flags.isolation == "external") {
-    return new ExternalContainerizer(flags);
+  // TODO(benh): We need to store which containerizer or
+  // containerizers were being used. See MESOS-1663.
+
+  // Create containerizer(s).
+  vector<Containerizer*> containerizers;
+
+  foreach (const string& type, strings::split(flags.containerizers, ",")) {
+    if (type == "mesos") {
+      Try<MesosContainerizer*> containerizer =
+        MesosContainerizer::create(flags, local);
+      if (containerizer.isError()) {
+        return Error("Could not create MesosContainerizer: " +
+                     containerizer.error());
+      } else {
+        containerizers.push_back(containerizer.get());
+      }
+    } else if (type == "external") {
+      Try<Containerizer*> containerizer =
+        ExternalContainerizer::create(flags, local);
+      if (containerizer.isError()) {
+        return Error("Could not create ExternalContainerizer: " +
+                     containerizer.error());
+      } else {
+        containerizers.push_back(containerizer.get());
+      }
+    } else {
+      return Error("Unknown or unsupported containerizer: " + type);
+    }
+  }
+
+  if (containerizers.size() == 1) {
+    return containerizers.front();
   }
 
-  Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, local);
+  Try<ComposingContainerizer*> containerizer =
+    ComposingContainerizer::create(containerizers);
 
   if (containerizer.isError()) {
     return Error(containerizer.error());

http://git-wip-us.apache.org/repos/asf/mesos/blob/29910a6e/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 146c401..5ea2f38 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -262,6 +262,16 @@ public:
         "The path to the external containerizer executable used when\n"
         "external isolation is activated (--isolation=external).\n");
 
+    add(&Flags::containerizers,
+        "containerizers",
+        "Comma separated list of containerizer implementations\n"
+        "to compose in order to provide containerization.\n"
+        "Available options are 'mesos', 'external', and\n"
+        "'docker' (on Linux). The order the containerizers\n"
+        "are specified is the order they are tried\n"
+        "(--containerizers=mesos).\n",
+        "mesos");
+
     add(&Flags::default_container_image,
         "default_container_image",
         "The default container image to use if not specified by a task,\n"
@@ -330,6 +340,7 @@ public:
 #endif
   Option<std::string> credential;
   Option<std::string> containerizer_path;
+  std::string containerizers;
   Option<std::string> default_container_image;
 #ifdef WITH_NETWORK_ISOLATOR
   uint16_t ephemeral_ports_per_container;


[4/4] git commit: Serialize Containerizer::launch,wait invocations.

Posted by be...@apache.org.
Serialize Containerizer::launch,wait invocations.

Review: https://reviews.apache.org/r/23463


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ddca900
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ddca900
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ddca900

Branch: refs/heads/master
Commit: 6ddca900a60a422c2bcba3015e304bf768ea3663
Parents: 8952b59
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Jul 9 11:02:10 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 09:15:50 2014 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6ddca900/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8e40c5b..b4a5a45 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2420,6 +2420,21 @@ void Slave::executorLaunched(
     const ContainerID& containerId,
     const Future<Nothing>& future)
 {
+  // Set up callback for executor termination. Note that we do this
+  // regardless of whether or not we have successfully launched the
+  // executor because even if we failed to launch the executor the
+  // result of calling 'wait' will make sure everything gets properly
+  // cleaned up. Note that we do this here instead of where we do
+  // Containerizer::launch because we want to guarantee the contract
+  // with the Containerizer that we won't call 'wait' until after the
+  // launch has completed.
+  containerizer->wait(containerId)
+    .onAny(defer(self(),
+                 &Self::executorTerminated,
+                 frameworkId,
+                 executorId,
+                 lambda::_1));
+
   if (!future.isReady()) {
     // The containerizer will clean up if the launch fails we'll just log this
     // and leave the executor registration to timeout.
@@ -3516,14 +3531,6 @@ Executor* Framework::launchExecutor(
                containerId,
                lambda::_1));
 
-  // Set up callback for executor termination.
-  slave->containerizer->wait(containerId)
-    .onAny(defer(slave,
-                 &Slave::executorTerminated,
-                 id,
-                 executor->id,
-                 lambda::_1));
-
   // Make sure the executor registers within the given timeout.
   delay(slave->flags.executor_registration_timeout,
         slave,