You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2014/05/13 01:16:13 UTC

git commit: Updated 'containers' implementation to forward to the ECP.

Repository: mesos
Updated Branches:
  refs/heads/master 48247e156 -> 3baf491c2


Updated 'containers' implementation to forward to the ECP.

Updates 'containers' implementation within the ExternalContainerizer
to forward the call to the ECP.  Adds new 'containers' proto message
to containers.proto.

Some minor cleanups like removing of needless namespace mentions that
slipped through.

NOTE: The test_containerizer.py implementation of 'containers' is
incomplete, it merely poses as a mock-up.  NOTE: This does *not* fix
MESOS-1257. A complete test_containerizer C++ implementation is
upcoming to fix these notes.

Review: https://reviews.apache.org/r/21040


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3baf491c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3baf491c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3baf491c

Branch: refs/heads/master
Commit: 3baf491c2df89469a474d7fffe20fd43fb25bf52
Parents: 48247e1
Author: Till Toenshoff <to...@me.com>
Authored: Mon May 12 16:03:20 2014 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Mon May 12 16:03:20 2014 -0700

----------------------------------------------------------------------
 include/mesos/containerizer/containerizer.proto |   9 ++
 src/examples/python/test_containerizer.py       |  41 +++--
 .../containerizer/external_containerizer.cpp    | 161 +++++++++++++------
 .../containerizer/external_containerizer.hpp    |  31 +++-
 4 files changed, 181 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3baf491c/include/mesos/containerizer/containerizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/containerizer/containerizer.proto b/include/mesos/containerizer/containerizer.proto
index c926a7f..29df785 100644
--- a/include/mesos/containerizer/containerizer.proto
+++ b/include/mesos/containerizer/containerizer.proto
@@ -90,3 +90,12 @@ message Termination {
   // Exit status of the process.
   optional int32 status = 3;
 }
+
+
+/**
+ * Information on all active containers returned by the containerizer
+ * to the slave.
+ */
+message Containers {
+  repeated ContainerID containers = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baf491c/src/examples/python/test_containerizer.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_containerizer.py b/src/examples/python/test_containerizer.py
index 7bebfee..7d1d4b3 100644
--- a/src/examples/python/test_containerizer.py
+++ b/src/examples/python/test_containerizer.py
@@ -210,7 +210,7 @@ def usage():
         send(statistics.SerializeToString())
 
     except google.protobuf.message.DecodeError:
-        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        print >> sys.stderr, "Could not deserialise Usage protobuf."
         return 1
 
     except google.protobuf.message.EncodeError:
@@ -247,7 +247,7 @@ def destroy():
             os.kill(pid, signal.SIGKILL)
 
     except google.protobuf.message.DecodeError:
-        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        print >> sys.stderr, "Could not deserialise Destroy protobuf."
         return 1
 
     except OSError as e:
@@ -310,7 +310,7 @@ def wait():
         send(termination.SerializeToString())
 
     except google.protobuf.message.DecodeError:
-        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        print >> sys.stderr, "Could not deserialise Termination protobuf."
         return 1
 
     except google.protobuf.message.EncodeError:
@@ -324,13 +324,36 @@ def wait():
     return 0
 
 
+def containers():
+    try:
+        containers = containerizer_pb2.Containers()
+
+        # This currently does not fill in any active containers and
+        # therefore is to be regarded as being not complete.
+        # A complete implementation would fill the containers message
+        # with all active ContainerIDs.
+
+        send(containers.SerializeToString())
+
+    except google.protobuf.message.EncodeError:
+        print >> sys.stderr, "Could not serialise Containers protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    return 0
+
+
 if __name__ == "__main__":
-    methods = { "launch":  launch,
-                "update":  update,
-                "destroy": destroy,
-                "recover": recover,
-                "usage":   usage,
-                "wait":    wait }
+    methods = { "launch":       launch,
+                "update":       update,
+                "destroy":      destroy,
+                "containers":   containers,
+                "recover":      recover,
+                "usage":        usage,
+                "wait":         wait }
 
     if sys.argv[1:2] == ["--help"] or sys.argv[1:2] == ["-h"]:
         print use(sys.argv[0], methods.keys())

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baf491c/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index e51ac66..385dac6 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -105,9 +105,7 @@ static Option<Error> validate(
 // message.
 template<typename T>
 static Try<T> result(
-    const process::Future<tuples::tuple<
-        process::Future<Result<T> >,
-        process::Future<Option<int> > > >& future)
+    const Future<tuple<Future<Result<T> >, Future<Option<int> > > >& future)
 {
   if (!future.isReady()) {
     return Error("Could not receive any result");
@@ -308,8 +306,8 @@ Future<Nothing> ExternalContainerizerProcess::launch(
 
   Try<Subprocess> invoked = invoke(
       "launch",
-      sandbox,
       launch,
+      sandbox,
       environment);
 
   if (invoked.isError()) {
@@ -405,8 +403,8 @@ Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
 
   Try<Subprocess> invoked = invoke(
       "wait",
-      actives[containerId]->sandbox,
-      wait);
+      wait,
+      actives[containerId]->sandbox);
 
   if (invoked.isError()) {
     // 'wait' has failed, we need to tear down everything now.
@@ -441,7 +439,7 @@ Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
 
 void ExternalContainerizerProcess::__wait(
     const ContainerID& containerId,
-    const Future<tuples::tuple<
+    const Future<tuple<
         Future<Result<containerizer::Termination> >,
         Future<Option<int> > > >& future)
 {
@@ -508,8 +506,8 @@ Future<Nothing> ExternalContainerizerProcess::_update(
 
   Try<Subprocess> invoked = invoke(
       "update",
-      actives[containerId]->sandbox,
-      update);
+      update,
+      actives[containerId]->sandbox);
 
   if (invoked.isError()) {
     return Failure("Update of container '" + containerId.value() +
@@ -576,8 +574,8 @@ Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
 
   Try<Subprocess> invoked = invoke(
       "usage",
-      actives[containerId]->sandbox,
-      usage);
+      usage,
+      actives[containerId]->sandbox);
 
   if (invoked.isError()) {
     // 'usage' has failed but we keep the container alive for now.
@@ -604,7 +602,7 @@ Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
 
 Future<ResourceStatistics> ExternalContainerizerProcess::__usage(
     const ContainerID& containerId,
-    const Future<tuples::tuple<
+    const Future<tuple<
         Future<Result<ResourceStatistics> >,
         Future<Option<int> > > >& future)
 {
@@ -664,8 +662,8 @@ void ExternalContainerizerProcess::_destroy(const ContainerID& containerId)
 
   Try<Subprocess> invoked = invoke(
       "destroy",
-      actives[containerId]->sandbox,
-      destroy);
+      destroy,
+      actives[containerId]->sandbox);
 
   if (invoked.isError()) {
     LOG(ERROR) << "Destroy of container '" << containerId
@@ -708,7 +706,50 @@ void ExternalContainerizerProcess::__destroy(
 
 Future<hashset<ContainerID> > ExternalContainerizerProcess::containers()
 {
-  return actives.keys();
+  VLOG(1) << "Containers triggered";
+
+  Try<Subprocess> invoked = invoke("containers");
+
+  if (invoked.isError()) {
+    return Failure("Containers failed: " + invoked.error());
+  }
+
+  Result<containerizer::Containers>(*read)(int, bool, bool) =
+    &::protobuf::read<containerizer::Containers>;
+
+  Future<Result<containerizer::Containers> > future = async(
+      read, invoked.get().out(), false, false);
+
+  // Await both, a protobuf Message from the subprocess as well as
+  // its exit.
+  return await(future, invoked.get().status())
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_containers,
+        lambda::_1));
+}
+
+
+Future<hashset<ContainerID> > ExternalContainerizerProcess::_containers(
+    const Future<tuple<
+        Future<Result<containerizer::Containers> >,
+        Future<Option<int> > > >& future)
+{
+  VLOG(1) << "Containers callback triggered";
+
+  Try<containerizer::Containers> containers =
+    result<containerizer::Containers>(future);
+
+  if (containers.isError()) {
+    return Failure(containers.error());
+  }
+
+  hashset<ContainerID> result;
+  foreach (const ContainerID& containerId, containers.get().containers()) {
+    result.insert(containerId);
+  }
+
+  return result;
 }
 
 
@@ -780,8 +821,10 @@ static int setup(const string& directory)
   }
 
   // Re/establish the sandbox conditions for the containerizer.
-  if (::chdir(directory.c_str()) == -1) {
-    return errno;
+  if (!directory.empty()) {
+    if (::chdir(directory.c_str()) == -1) {
+      return errno;
+    }
   }
 
   // Sync parent and child process.
@@ -793,11 +836,10 @@ static int setup(const string& directory)
 }
 
 
-Try<process::Subprocess> ExternalContainerizerProcess::invoke(
+Try<Subprocess> ExternalContainerizerProcess::invoke(
     const string& command,
-    const Sandbox& sandbox,
-    const google::protobuf::Message& message,
-    const map<string, string>& commandEnvironment)
+    const Option<Sandbox>& sandbox,
+    const Option<map<string, string> >& commandEnvironment)
 {
   CHECK_SOME(flags.containerizer_path) << "containerizer_path not set";
 
@@ -806,22 +848,28 @@ Try<process::Subprocess> ExternalContainerizerProcess::invoke(
   // Prepare a default environment.
   map<string, string> environment;
   environment["MESOS_LIBEXEC_DIRECTORY"] = flags.launcher_dir;
+  environment["MESOS_WORK_DIRECTORY"] = flags.work_dir;
 
   // Update default environment with command specific one.
-  environment.insert(commandEnvironment.begin(), commandEnvironment.end());
+  if (commandEnvironment.isSome()) {
+    environment.insert(
+        commandEnvironment.get().begin(),
+        commandEnvironment.get().end());
+  }
 
   // Construct the command to execute.
   string execute = flags.containerizer_path.get() + " " + command;
 
   VLOG(2) << "calling: [" << execute << "]";
-  VLOG(2) << "directory: " << sandbox.directory;
-  VLOG_IF(sandbox.user.isSome(), 2) << "user: " << sandbox.user.get();
+  VLOG_IF(2, sandbox.isSome()) << "directory: " << sandbox.get().directory;
+  VLOG_IF(2, sandbox.isSome() &&
+      sandbox.get().user.isSome()) << "user: " << sandbox.get().user.get();
 
   // Re/establish the sandbox conditions for the containerizer.
-  if (sandbox.user.isSome()) {
+  if (sandbox.isSome() && sandbox.get().user.isSome()) {
     Try<Nothing> chown = os::chown(
-        sandbox.user.get(),
-        sandbox.directory);
+        sandbox.get().user.get(),
+        sandbox.get().directory);
     if (chown.isError()) {
       return Error("Failed to chown work directory: " + chown.error());
     }
@@ -832,7 +880,8 @@ Try<process::Subprocess> ExternalContainerizerProcess::invoke(
   Try<Subprocess> external = process::subprocess(
       execute,
       environment,
-      lambda::bind(&setup, sandbox.directory));
+      lambda::bind(&setup, sandbox.isSome() ? sandbox.get().directory
+                                            : string()));
 
   if (external.isError()) {
     return Error("Failed to execute external containerizer: " +
@@ -857,26 +906,46 @@ Try<process::Subprocess> ExternalContainerizerProcess::invoke(
   // Redirect output (stderr) from the external containerizer to log
   // file in the executor work directory, chown'ing it if a user is
   // specified.
-  Try<int> err = os::open(
-      path::join(sandbox.directory, "stderr"),
-      O_WRONLY | O_CREAT | O_APPEND | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (err.isError()) {
-    return Error("Failed to redirect stderr: " + err.error());
-  }
+  if (sandbox.isSome()) {
+    Try<int> err = os::open(
+        path::join(sandbox.get().directory, "stderr"),
+        O_WRONLY | O_CREAT | O_APPEND | O_NONBLOCK,
+        S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+    if (err.isError()) {
+      return Error("Failed to redirect stderr: " + err.error());
+    }
 
-  if (sandbox.user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        sandbox.user.get(),
-        path::join(sandbox.directory, "stderr"));
-    if (chown.isError()) {
-      return Error("Failed to redirect stderr:" + chown.error());
+    if (sandbox.get().user.isSome()) {
+      Try<Nothing> chown = os::chown(
+          sandbox.get().user.get(),
+          path::join(sandbox.get().directory, "stderr"));
+      if (chown.isError()) {
+        return Error("Failed to redirect stderr:" + chown.error());
+      }
     }
+
+    io::splice(external.get().err(), err.get())
+      .onAny(bind(&os::close, err.get()));
   }
 
-  io::splice(external.get().err(), err.get())
-    .onAny(bind(&os::close, err.get()));
+  VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
+          << "output pipe: " << external.get().out();
+
+  return external;
+}
+
+
+Try<Subprocess> ExternalContainerizerProcess::invoke(
+    const string& command,
+    const google::protobuf::Message& message,
+    const Option<Sandbox>& sandbox,
+    const Option<map<string, string> >& commandEnvironment)
+{
+  Try<Subprocess> external = invoke(command, sandbox, commandEnvironment);
+  if (external.isError()) {
+    return external;
+  }
 
   // Transmit protobuf data via stdout towards the external
   // containerizer. Each message is prefixed by its total size.
@@ -885,13 +954,9 @@ Try<process::Subprocess> ExternalContainerizerProcess::invoke(
     return Error("Failed to write protobuf to pipe: " + write.error());
   }
 
-  VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
-          << "output pipe: " << external.get().out();
-
   return external;
 }
 
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baf491c/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
index 8fdf097..a955a38 100644
--- a/src/slave/containerizer/external_containerizer.hpp
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -48,10 +48,19 @@ namespace slave {
 // usage < containerizer::Usage > mesos::ResourceStatistics
 // wait < containerizer::Wait > containerizer::Termination
 // destroy < containerizer::Destroy
+// containers > containerizer::Containers
 //
 // 'wait' on the external containerizer side is expected to block
 // until the task command/executor has terminated.
 //
+// Additionally, we have the following environment variable setup
+// for external containerizer programs:
+// MESOS_LIBEXEC_DIRECTORY = path to mesos-executor, mesos-usage, ...
+// MESOS_WORK_DIRECTORY = slave work directory. This should be used
+// for distiguishing slave instances.
+// MESOS_DEFAULT_CONTAINER_IMAGE = default image as provided via
+// slave flags (default_container_image). This variable is provided
+// only in calls to 'launch'.
 
 // Check src/examples/python/test_containerizer.py for a rough
 // implementation template of this protocol.
@@ -236,6 +245,11 @@ private:
       const ContainerID& containerId,
       const process::Future<Option<int> >& future);
 
+  process::Future<hashset<ContainerID> > _containers(
+      const process::Future<tuples::tuple<
+          process::Future<Result<containerizer::Containers> >,
+          process::Future<Option<int> > > >& future);
+
   // Abort a possibly pending "wait" in the external containerizer
   // process.
   void unwait(const ContainerID& containerId);
@@ -244,13 +258,22 @@ private:
   // in the container.
   void cleanup(const ContainerID& containerId);
 
+  // Invoke the external containerizer with the given command.
+  Try<process::Subprocess> invoke(
+      const std::string& command,
+      const Option<Sandbox>& sandbox = None(),
+      const Option<std::map<std::string, std::string> >& environment = None());
+
+  // Invoke the external containerizer with the given command and
+  // a protobuf message to be piped into its stdin.
+  // There can not be an Option<google::protobuf::Message> due to the
+  // pure virtual members of that class, hence this override is
+  // needed.
   Try<process::Subprocess> invoke(
       const std::string& command,
-      const Sandbox& sandbox,
       const google::protobuf::Message& message,
-      const std::map<std::string, std::string>& environment =
-        // Default in parens due to: http://llvm.org/bugs/show_bug.cgi?id=13657
-        (std::map<std::string, std::string>()));
+      const Option<Sandbox>& sandbox = None(),
+      const Option<std::map<std::string, std::string> >& environment = None());
 };