You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/12/30 00:13:33 UTC
[2/4] mesos git commit: Add support for the ContainerLogger to the
Mesos Containerizer.
Add support for the ContainerLogger to the Mesos Containerizer.
Changes the `MesosContainerizer` to create and initialize the
`ContainerLogger`.
The `MesosContainerizer` modifies the arguments to `launcher->fork()`
(in `::_launch`) by calling the `ContainerLogger` beforehand.
Review: https://reviews.apache.org/r/41167/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/17ef06d8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/17ef06d8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/17ef06d8
Branch: refs/heads/master
Commit: 17ef06d883a1771ce66fcc5301b765e0799ebeb8
Parents: ed7bfa9
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Tue Dec 29 11:25:50 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Dec 29 15:13:23 2015 -0800
----------------------------------------------------------------------
src/slave/containerizer/mesos/containerizer.cpp | 170 +++++++++++--------
src/slave/containerizer/mesos/containerizer.hpp | 5 +
2 files changed, 100 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/17ef06d8/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 8242190..db8c746 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -16,6 +16,7 @@
#include <mesos/module/isolator.hpp>
+#include <mesos/slave/container_logger.hpp>
#include <mesos/slave/isolator.hpp>
#include <process/collect.hpp>
@@ -90,6 +91,7 @@ namespace slave {
using mesos::modules::ModuleManager;
using mesos::slave::ContainerLimitation;
+using mesos::slave::ContainerLogger;
using mesos::slave::ContainerPrepareInfo;
using mesos::slave::ContainerState;
using mesos::slave::Isolator;
@@ -106,6 +108,14 @@ Try<MesosContainerizer*> MesosContainerizer::create(
bool local,
Fetcher* fetcher)
{
+ // Create and initialize the container logger module.
+ Try<ContainerLogger*> logger =
+ ContainerLogger::create(flags.container_logger);
+
+ if (logger.isError()) {
+ return Error("Failed to create container logger: " + logger.error());
+ }
+
string isolation;
if (flags.isolation == "process") {
@@ -245,6 +255,7 @@ Try<MesosContainerizer*> MesosContainerizer::create(
flags_,
local,
fetcher,
+ Owned<ContainerLogger>(logger.get()),
Owned<Launcher>(launcher.get()),
isolators);
}
@@ -254,12 +265,14 @@ MesosContainerizer::MesosContainerizer(
const Flags& flags,
bool local,
Fetcher* fetcher,
+ const Owned<ContainerLogger>& logger,
const Owned<Launcher>& launcher,
const vector<Owned<Isolator>>& isolators)
: process(new MesosContainerizerProcess(
flags,
local,
fetcher,
+ logger,
launcher,
isolators))
{
@@ -790,89 +803,96 @@ Future<bool> MesosContainerizerProcess::_launch(
JSON::Object commands;
commands.values["commands"] = commandArray;
- // Use a pipe to block the child until it's been isolated.
- int pipes[2];
-
- // We assume this should not fail under reasonable conditions so we
- // use CHECK.
- CHECK(pipe(pipes) == 0);
-
- // Prepare the flags to pass to the launch process.
- MesosContainerizerLaunch::Flags launchFlags;
-
- launchFlags.command = JSON::protobuf(executorInfo.command());
-
- launchFlags.directory = rootfs.isSome() ? flags.sandbox_directory : directory;
- launchFlags.rootfs = rootfs;
- launchFlags.user = user;
- launchFlags.pipe_read = pipes[0];
- launchFlags.pipe_write = pipes[1];
- launchFlags.commands = commands;
-
- // Fork the child using launcher.
- vector<string> argv(2);
- argv[0] = MESOS_CONTAINERIZER;
- argv[1] = MesosContainerizerLaunch::NAME;
-
- Try<pid_t> forked = launcher->fork(
- containerId,
- path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
- argv,
- Subprocess::FD(STDIN_FILENO),
- (local ? Subprocess::FD(STDOUT_FILENO)
- : Subprocess::PATH(path::join(directory, "stdout"))),
- (local ? Subprocess::FD(STDERR_FILENO)
- : Subprocess::PATH(path::join(directory, "stderr"))),
- launchFlags,
- environment,
- None(),
- namespaces); // 'namespaces' will be ignored by PosixLauncher.
-
- if (forked.isError()) {
- return Failure("Failed to fork executor: " + forked.error());
- }
- pid_t pid = forked.get();
+ return logger->prepare(executorInfo, directory)
+ .then(defer(
+ self(),
+ [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
+ -> Future<bool> {
+ // Use a pipe to block the child until it's been isolated.
+ int pipes[2];
+
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK(pipe(pipes) == 0);
+
+ // Prepare the flags to pass to the launch process.
+ MesosContainerizerLaunch::Flags launchFlags;
+
+ launchFlags.command = JSON::protobuf(executorInfo.command());
+
+ launchFlags.directory =
+ rootfs.isSome() ? flags.sandbox_directory : directory;
+ launchFlags.rootfs = rootfs;
+ launchFlags.user = user;
+ launchFlags.pipe_read = pipes[0];
+ launchFlags.pipe_write = pipes[1];
+ launchFlags.commands = commands;
+
+ // Fork the child using launcher.
+ vector<string> argv(2);
+ argv[0] = MESOS_CONTAINERIZER;
+ argv[1] = MesosContainerizerLaunch::NAME;
+
+ Try<pid_t> forked = launcher->fork(
+ containerId,
+ path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ (local ? Subprocess::FD(STDOUT_FILENO)
+ : (Subprocess::IO) subprocessInfo.out),
+ (local ? Subprocess::FD(STDERR_FILENO)
+ : (Subprocess::IO) subprocessInfo.err),
+ launchFlags,
+ environment,
+ None(),
+ namespaces); // 'namespaces' will be ignored by PosixLauncher.
+
+ if (forked.isError()) {
+ return Failure("Failed to fork executor: " + forked.error());
+ }
+ pid_t pid = forked.get();
- // Checkpoint the executor's pid if requested.
- if (checkpoint) {
- const string& path = slave::paths::getForkedPidPath(
- slave::paths::getMetaRootDir(flags.work_dir),
- slaveId,
- executorInfo.framework_id(),
- executorInfo.executor_id(),
- containerId);
+ // Checkpoint the executor's pid if requested.
+ if (checkpoint) {
+ const string& path = slave::paths::getForkedPidPath(
+ slave::paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ executorInfo.framework_id(),
+ executorInfo.executor_id(),
+ containerId);
- LOG(INFO) << "Checkpointing executor's forked pid " << pid
- << " to '" << path << "'";
+ LOG(INFO) << "Checkpointing executor's forked pid " << pid
+ << " to '" << path << "'";
- Try<Nothing> checkpointed =
- slave::state::checkpoint(path, stringify(pid));
+ Try<Nothing> checkpointed =
+ slave::state::checkpoint(path, stringify(pid));
- if (checkpointed.isError()) {
- LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
- << path << "': " << checkpointed.error();
+ if (checkpointed.isError()) {
+ LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+ << path << "': " << checkpointed.error();
- return Failure("Could not checkpoint executor's pid");
+ return Failure("Could not checkpoint executor's pid");
+ }
}
- }
- // Monitor the executor's pid. We keep the future because we'll
- // refer to it again during container destroy.
- Future<Option<int>> status = process::reap(pid);
- status.onAny(defer(self(), &Self::reaped, containerId));
- containers_[containerId]->status = status;
+ // Monitor the executor's pid. We keep the future because we'll
+ // refer to it again during container destroy.
+ Future<Option<int>> status = process::reap(pid);
+ status.onAny(defer(self(), &Self::reaped, containerId));
+ containers_[containerId]->status = status;
- return isolate(containerId, pid)
- .then(defer(self(),
- &Self::fetch,
- containerId,
- executorInfo.command(),
- directory,
- user,
- slaveId))
- .then(defer(self(), &Self::exec, containerId, pipes[1]))
- .onAny(lambda::bind(&os::close, pipes[0]))
- .onAny(lambda::bind(&os::close, pipes[1]));
+ return isolate(containerId, pid)
+ .then(defer(self(),
+ &Self::fetch,
+ containerId,
+ executorInfo.command(),
+ directory,
+ user,
+ slaveId))
+ .then(defer(self(), &Self::exec, containerId, pipes[1]))
+ .onAny(lambda::bind(&os::close, pipes[0]))
+ .onAny(lambda::bind(&os::close, pipes[1]));
+ }));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/17ef06d8/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index ab87cbc..89d1862 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -20,6 +20,7 @@
#include <list>
#include <vector>
+#include <mesos/slave/container_logger.hpp>
#include <mesos/slave/isolator.hpp>
#include <process/metrics/counter.hpp>
@@ -54,6 +55,7 @@ public:
const Flags& flags,
bool local,
Fetcher* fetcher,
+ const process::Owned<mesos::slave::ContainerLogger>& logger,
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<mesos::slave::Isolator>>& isolators);
@@ -111,11 +113,13 @@ public:
const Flags& _flags,
bool _local,
Fetcher* _fetcher,
+ const process::Owned<mesos::slave::ContainerLogger>& _logger,
const process::Owned<Launcher>& _launcher,
const std::vector<process::Owned<mesos::slave::Isolator>>& _isolators)
: flags(_flags),
local(_local),
fetcher(_fetcher),
+ logger(_logger),
launcher(_launcher),
isolators(_isolators) {}
@@ -233,6 +237,7 @@ private:
const Flags flags;
const bool local;
Fetcher* fetcher;
+ process::Owned<mesos::slave::ContainerLogger> logger;
const process::Owned<Launcher> launcher;
const std::vector<process::Owned<mesos::slave::Isolator>> isolators;