You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/12/30 00:13:33 UTC

[2/4] mesos git commit: Add support for the ContainerLogger to the Mesos Containerizer.

Add support for the ContainerLogger to the Mesos Containerizer.

Changes the `MesosContainerizer` to create and initialize the
`ContainerLogger`.

The `MesosContainerizer` modifies the arguments to `launcher->fork()`
(in `::_launch`) by calling the `ContainerLogger` beforehand.

Review: https://reviews.apache.org/r/41167/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/17ef06d8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/17ef06d8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/17ef06d8

Branch: refs/heads/master
Commit: 17ef06d883a1771ce66fcc5301b765e0799ebeb8
Parents: ed7bfa9
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Tue Dec 29 11:25:50 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Dec 29 15:13:23 2015 -0800

----------------------------------------------------------------------
 src/slave/containerizer/mesos/containerizer.cpp | 170 +++++++++++--------
 src/slave/containerizer/mesos/containerizer.hpp |   5 +
 2 files changed, 100 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/17ef06d8/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 8242190..db8c746 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -16,6 +16,7 @@
 
 #include <mesos/module/isolator.hpp>
 
+#include <mesos/slave/container_logger.hpp>
 #include <mesos/slave/isolator.hpp>
 
 #include <process/collect.hpp>
@@ -90,6 +91,7 @@ namespace slave {
 using mesos::modules::ModuleManager;
 
 using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerLogger;
 using mesos::slave::ContainerPrepareInfo;
 using mesos::slave::ContainerState;
 using mesos::slave::Isolator;
@@ -106,6 +108,14 @@ Try<MesosContainerizer*> MesosContainerizer::create(
     bool local,
     Fetcher* fetcher)
 {
+  // Create and initialize the container logger module.
+  Try<ContainerLogger*> logger =
+    ContainerLogger::create(flags.container_logger);
+
+  if (logger.isError()) {
+    return Error("Failed to create container logger: " + logger.error());
+  }
+
   string isolation;
 
   if (flags.isolation == "process") {
@@ -245,6 +255,7 @@ Try<MesosContainerizer*> MesosContainerizer::create(
       flags_,
       local,
       fetcher,
+      Owned<ContainerLogger>(logger.get()),
       Owned<Launcher>(launcher.get()),
       isolators);
 }
@@ -254,12 +265,14 @@ MesosContainerizer::MesosContainerizer(
     const Flags& flags,
     bool local,
     Fetcher* fetcher,
+    const Owned<ContainerLogger>& logger,
     const Owned<Launcher>& launcher,
     const vector<Owned<Isolator>>& isolators)
   : process(new MesosContainerizerProcess(
       flags,
       local,
       fetcher,
+      logger,
       launcher,
       isolators))
 {
@@ -790,89 +803,96 @@ Future<bool> MesosContainerizerProcess::_launch(
   JSON::Object commands;
   commands.values["commands"] = commandArray;
 
-  // Use a pipe to block the child until it's been isolated.
-  int pipes[2];
-
-  // We assume this should not fail under reasonable conditions so we
-  // use CHECK.
-  CHECK(pipe(pipes) == 0);
-
-  // Prepare the flags to pass to the launch process.
-  MesosContainerizerLaunch::Flags launchFlags;
-
-  launchFlags.command = JSON::protobuf(executorInfo.command());
-
-  launchFlags.directory = rootfs.isSome() ? flags.sandbox_directory : directory;
-  launchFlags.rootfs = rootfs;
-  launchFlags.user = user;
-  launchFlags.pipe_read = pipes[0];
-  launchFlags.pipe_write = pipes[1];
-  launchFlags.commands = commands;
-
-  // Fork the child using launcher.
-  vector<string> argv(2);
-  argv[0] = MESOS_CONTAINERIZER;
-  argv[1] = MesosContainerizerLaunch::NAME;
-
-  Try<pid_t> forked = launcher->fork(
-      containerId,
-      path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
-      argv,
-      Subprocess::FD(STDIN_FILENO),
-      (local ? Subprocess::FD(STDOUT_FILENO)
-             : Subprocess::PATH(path::join(directory, "stdout"))),
-      (local ? Subprocess::FD(STDERR_FILENO)
-             : Subprocess::PATH(path::join(directory, "stderr"))),
-      launchFlags,
-      environment,
-      None(),
-      namespaces); // 'namespaces' will be ignored by PosixLauncher.
-
-  if (forked.isError()) {
-    return Failure("Failed to fork executor: " + forked.error());
-  }
-  pid_t pid = forked.get();
+  return logger->prepare(executorInfo, directory)
+    .then(defer(
+        self(),
+        [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
+          -> Future<bool> {
+    // Use a pipe to block the child until it's been isolated.
+    int pipes[2];
+
+    // We assume this should not fail under reasonable conditions so we
+    // use CHECK.
+    CHECK(pipe(pipes) == 0);
+
+    // Prepare the flags to pass to the launch process.
+    MesosContainerizerLaunch::Flags launchFlags;
+
+    launchFlags.command = JSON::protobuf(executorInfo.command());
+
+    launchFlags.directory =
+      rootfs.isSome() ? flags.sandbox_directory : directory;
+    launchFlags.rootfs = rootfs;
+    launchFlags.user = user;
+    launchFlags.pipe_read = pipes[0];
+    launchFlags.pipe_write = pipes[1];
+    launchFlags.commands = commands;
+
+    // Fork the child using launcher.
+    vector<string> argv(2);
+    argv[0] = MESOS_CONTAINERIZER;
+    argv[1] = MesosContainerizerLaunch::NAME;
+
+    Try<pid_t> forked = launcher->fork(
+        containerId,
+        path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
+        argv,
+        Subprocess::FD(STDIN_FILENO),
+        (local ? Subprocess::FD(STDOUT_FILENO)
+               : (Subprocess::IO) subprocessInfo.out),
+        (local ? Subprocess::FD(STDERR_FILENO)
+               : (Subprocess::IO) subprocessInfo.err),
+        launchFlags,
+        environment,
+        None(),
+        namespaces); // 'namespaces' will be ignored by PosixLauncher.
+
+    if (forked.isError()) {
+      return Failure("Failed to fork executor: " + forked.error());
+    }
+    pid_t pid = forked.get();
 
-  // Checkpoint the executor's pid if requested.
-  if (checkpoint) {
-    const string& path = slave::paths::getForkedPidPath(
-        slave::paths::getMetaRootDir(flags.work_dir),
-        slaveId,
-        executorInfo.framework_id(),
-        executorInfo.executor_id(),
-        containerId);
+    // Checkpoint the executor's pid if requested.
+    if (checkpoint) {
+      const string& path = slave::paths::getForkedPidPath(
+          slave::paths::getMetaRootDir(flags.work_dir),
+          slaveId,
+          executorInfo.framework_id(),
+          executorInfo.executor_id(),
+          containerId);
 
-    LOG(INFO) << "Checkpointing executor's forked pid " << pid
-              << " to '" << path <<  "'";
+      LOG(INFO) << "Checkpointing executor's forked pid " << pid
+                << " to '" << path <<  "'";
 
-    Try<Nothing> checkpointed =
-      slave::state::checkpoint(path, stringify(pid));
+      Try<Nothing> checkpointed =
+        slave::state::checkpoint(path, stringify(pid));
 
-    if (checkpointed.isError()) {
-      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
-                 << path << "': " << checkpointed.error();
+      if (checkpointed.isError()) {
+        LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+                   << path << "': " << checkpointed.error();
 
-      return Failure("Could not checkpoint executor's pid");
+        return Failure("Could not checkpoint executor's pid");
+      }
     }
-  }
 
-  // Monitor the executor's pid. We keep the future because we'll
-  // refer to it again during container destroy.
-  Future<Option<int>> status = process::reap(pid);
-  status.onAny(defer(self(), &Self::reaped, containerId));
-  containers_[containerId]->status = status;
+    // Monitor the executor's pid. We keep the future because we'll
+    // refer to it again during container destroy.
+    Future<Option<int>> status = process::reap(pid);
+    status.onAny(defer(self(), &Self::reaped, containerId));
+    containers_[containerId]->status = status;
 
-  return isolate(containerId, pid)
-    .then(defer(self(),
-                &Self::fetch,
-                containerId,
-                executorInfo.command(),
-                directory,
-                user,
-                slaveId))
-    .then(defer(self(), &Self::exec, containerId, pipes[1]))
-    .onAny(lambda::bind(&os::close, pipes[0]))
-    .onAny(lambda::bind(&os::close, pipes[1]));
+    return isolate(containerId, pid)
+      .then(defer(self(),
+                  &Self::fetch,
+                  containerId,
+                  executorInfo.command(),
+                  directory,
+                  user,
+                  slaveId))
+      .then(defer(self(), &Self::exec, containerId, pipes[1]))
+      .onAny(lambda::bind(&os::close, pipes[0]))
+      .onAny(lambda::bind(&os::close, pipes[1]));
+  }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/17ef06d8/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index ab87cbc..89d1862 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -20,6 +20,7 @@
 #include <list>
 #include <vector>
 
+#include <mesos/slave/container_logger.hpp>
 #include <mesos/slave/isolator.hpp>
 
 #include <process/metrics/counter.hpp>
@@ -54,6 +55,7 @@ public:
       const Flags& flags,
       bool local,
       Fetcher* fetcher,
+      const process::Owned<mesos::slave::ContainerLogger>& logger,
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<mesos::slave::Isolator>>& isolators);
 
@@ -111,11 +113,13 @@ public:
       const Flags& _flags,
       bool _local,
       Fetcher* _fetcher,
+      const process::Owned<mesos::slave::ContainerLogger>& _logger,
       const process::Owned<Launcher>& _launcher,
       const std::vector<process::Owned<mesos::slave::Isolator>>& _isolators)
     : flags(_flags),
       local(_local),
       fetcher(_fetcher),
+      logger(_logger),
       launcher(_launcher),
       isolators(_isolators) {}
 
@@ -233,6 +237,7 @@ private:
   const Flags flags;
   const bool local;
   Fetcher* fetcher;
+  process::Owned<mesos::slave::ContainerLogger> logger;
   const process::Owned<Launcher> launcher;
   const std::vector<process::Owned<mesos::slave::Isolator>> isolators;