You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/04/28 09:07:44 UTC
git commit: Added External Containerizer.
Repository: mesos
Updated Branches:
refs/heads/master f6b838a0b -> bd6a0dadd
Added External Containerizer.
This patch adds the so-called external containerizer. This
containerizer delegates all containerizer calls directly to
an external containerizer program (which can be specified on
start-up).
The protocol for the interactions with the external program
is as follows:
COMMAND < INPUT-PROTO > RESULT-PROTO
launch < Launch
update < Update
usage < Usage > ResourceStatistics
wait < Wait > Termination
destroy < Destroy
When protocol buffers need to be provided, the Mesos side of
the external containerizer implementation will serialize the
protos on stdin and vice-versa for reading protos on stdout as
drafted in the above scheme.
Review: https://reviews.apache.org/r/17567
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd6a0dad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd6a0dad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd6a0dad
Branch: refs/heads/master
Commit: bd6a0dadd84b6c996d49baa46a3e8cb0bb68251d
Parents: f6b838a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Apr 27 20:38:06 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Apr 28 00:06:39 2014 -0700
----------------------------------------------------------------------
configure.ac | 2 +
src/Makefile.am | 8 +-
src/examples/python/test-containerizer.in | 47 +
src/examples/python/test_containerizer.py | 353 ++++++++
src/slave/containerizer/containerizer.cpp | 5 +
src/slave/containerizer/containerizer.hpp | 1 +
.../containerizer/external_containerizer.cpp | 897 +++++++++++++++++++
.../containerizer/external_containerizer.hpp | 260 ++++++
src/slave/containerizer/mesos_containerizer.cpp | 1 +
src/slave/flags.hpp | 18 +-
src/tests/external_containerizer_test.cpp | 257 ++++++
11 files changed, 1844 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 01182b3..4967fe4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -691,6 +691,8 @@ There are two possible workarounds for this issue:
[chmod +x src/examples/python/test-executor])
AC_CONFIG_FILES([src/examples/python/test-framework],
[chmod +x src/examples/python/test-framework])
+ AC_CONFIG_FILES([src/examples/python/test-containerizer],
+ [chmod +x src/examples/python/test-containerizer])
AC_CONFIG_FILES([src/python/setup.py])
# When clang is being used, make sure that the distutils python-
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index c2029c7..d2e006d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -212,6 +212,7 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/containerizer/isolator.cpp \
slave/containerizer/launcher.cpp \
slave/containerizer/mesos_containerizer.cpp \
+ slave/containerizer/external_containerizer.cpp \
slave/status_update_manager.cpp \
exec/exec.cpp \
common/lock.cpp \
@@ -290,6 +291,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
slave/containerizer/isolators/posix.hpp \
slave/containerizer/launcher.hpp \
slave/containerizer/mesos_containerizer.hpp \
+ slave/containerizer/external_containerizer.hpp \
slave/flags.hpp slave/gc.hpp slave/monitor.hpp \
slave/paths.hpp slave/state.hpp \
slave/status_update_manager.hpp \
@@ -889,6 +891,7 @@ mesos_tests_SOURCES = \
tests/flags.cpp \
tests/gc_tests.cpp \
tests/isolator_tests.cpp \
+ tests/external_containerizer_test.cpp \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
@@ -956,8 +959,9 @@ if HAS_PYTHON
EXAMPLESCRIPTSPYTHON = examples/python/test_framework.py \
examples/python/test-framework \
examples/python/test_executor.py \
- examples/python/test-executor
-
+ examples/python/test-executor \
+ examples/python/test_containerizer.py \
+ examples/python/test-containerizer
check_SCRIPTS += $(EXAMPLESCRIPTSPYTHON)
mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSPYTHON)
endif
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/examples/python/test-containerizer.in
----------------------------------------------------------------------
diff --git a/src/examples/python/test-containerizer.in b/src/examples/python/test-containerizer.in
new file mode 100644
index 0000000..569519b
--- /dev/null
+++ b/src/examples/python/test-containerizer.in
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+# Force the use of the Python interpreter configured during building.
+test ! -z "${PYTHON}" && \
+ echo "${RED}Ignoring PYTHON environment variable (using @PYTHON@)${NORMAL}"
+
+PYTHON=@PYTHON@
+
+DISTRIBUTE_EGG=${MESOS_BUILD_DIR}/3rdparty/distribute-0.6.26/dist/
+DISTRIBUTE_EGG+=distribute-0.6.26@PYTHON_EGG_PUREPY_POSTFIX@.egg
+
+test ! -e ${DISTRIBUTE_EGG} && \
+ echo "${RED}Failed to find ${DISTRIBUTE_EGG}${NORMAL}" && \
+ exit 1
+
+PROTOBUF=${MESOS_BUILD_DIR}/3rdparty/libprocess/3rdparty/protobuf-2.5.0
+
+PROTOBUF_EGG=${PROTOBUF}/python/dist/
+PROTOBUF_EGG+=protobuf-2.5.0@PYTHON_EGG_PUREPY_POSTFIX@.egg
+
+test ! -e ${PROTOBUF_EGG} && \
+ echo "${RED}Failed to find ${PROTOBUF_EGG}${NORMAL}" && \
+ exit 1
+
+MESOS_EGG=${MESOS_BUILD_DIR}/src/python/dist/
+MESOS_EGG+=mesos-@PACKAGE_VERSION@@PYTHON_EGG_POSTFIX@.egg
+
+test ! -e ${MESOS_EGG} && \
+ echo "${RED}Failed to find ${MESOS_EGG}${NORMAL}" && \
+ exit 1
+
+SCRIPT=${MESOS_SOURCE_DIR}/src/examples/python/test_containerizer.py
+
+test ! -e ${SCRIPT} && \
+ echo "${RED}Failed to find ${SCRIPT}${NORMAL}" && \
+ exit 1
+
+PYTHONPATH="${DISTRIBUTE_EGG}:${MESOS_EGG}:${PROTOBUF_EGG}" \
+ exec ${PYTHON} ${SCRIPT} "${@}"
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/examples/python/test_containerizer.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_containerizer.py b/src/examples/python/test_containerizer.py
new file mode 100644
index 0000000..57b4bb6
--- /dev/null
+++ b/src/examples/python/test_containerizer.py
@@ -0,0 +1,353 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The scheme an external containerizer has to adhere to is;
+#
+# COMMAND < INPUT-PROTO > RESULT-PROTO
+#
+# launch < Launch
+# update < Update
+# usage < Usage > ResourceStatistics
+# wait < Wait > Termination
+# destroy < Destroy
+#
+# 'wait' is expected to block until the task command/executor has
+# terminated.
+
+# TODO(tillt): Implement a protocol for external containerizer
+# recovery by defining needed protobuf/s.
+# Currently we exepect to cover recovery entirely on the slave side.
+
+import fcntl
+import multiprocessing
+import os
+import signal
+import subprocess
+import sys
+import struct
+import time
+import google
+import mesos
+import mesos_pb2
+import containerizer_pb2
+
+
+# Render a string describing how to use this script.
+def use(argv0, methods):
+ out = "Usage: %s <command>\n" % argv0
+ out += "Valid commands: " + ', '.join(methods)
+
+ return out
+
+
+# Read a data chunk prefixed by its total size from stdin.
+def receive():
+ # Read size (uint32 => 4 bytes).
+ size = struct.unpack('I', sys.stdin.read(4))
+ if size[0] <= 0:
+ print >> sys.stderr, "Expected protobuf size over stdin. " \
+ "Received 0 bytes."
+ return ""
+
+ # Read payload.
+ data = sys.stdin.read(size[0])
+ if len(data) != size[0]:
+ print >> sys.stderr, "Expected %d bytes protobuf over stdin. " \
+ "Received %d bytes." % (size[0], len(data))
+ return ""
+
+ return data
+
+
+# Write a protobuf message prefixed by its total size (aka recordio)
+# to stdout.
+def send(data):
+ # Write size (uint32 => 4 bytes).
+ sys.stdout.write(struct.pack('I', len(data)))
+
+ # Write payload.
+ sys.stdout.write(data)
+
+
+# Start a containerized executor. Expects to receive an Launch
+# protobuf via stdin.
+def launch():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+
+ launch = containerizer_pb2.Launch()
+ launch.ParseFromString(data)
+
+ if launch.task_info.HasField("executor"):
+ command = ["sh",
+ "-c",
+ launch.task_info.executor.command.value]
+ else:
+ print >> sys.stderr, "No executor passed; using mesos-executor!"
+ executor = os.path.join(os.environ['MESOS_LIBEXEC_DIRECTORY'],
+ "mesos-executor")
+ command = [executor,
+ "sh",
+ "-c",
+ launch.task_info.command.value]
+ print >> sys.stderr, "command " + str(command)
+
+ lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+ launch.container_id.value)
+ subprocess.check_call(["mkdir", "-p", lock_dir])
+
+ # Fork a child process for allowing a blocking wait.
+ pid = os.fork()
+ if pid == 0:
+ # We are in the child.
+ proc = subprocess.Popen(command, env=os.environ.copy())
+
+ # Wait and serialize the process status when done.
+ lock = os.path.join(lock_dir, "wait")
+ with open(lock, "w+") as lk:
+ fcntl.flock(lk, fcntl.LOCK_EX)
+
+ status = proc.wait()
+
+ lk.write(str(status) + "\n")
+
+ sys.exit(status)
+ else:
+ # We are in the parent.
+
+ # Serialize the subprocess pid.
+ lock = os.path.join(lock_dir, "pid")
+ with open(lock, "w+") as lk:
+ fcntl.flock(lk, fcntl.LOCK_EX)
+
+ lk.write(str(pid) + "\n")
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise Launch protobuf"
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ except ValueError:
+ print >> sys.stderr, "Value is invalid"
+ return 1
+
+ return 0
+
+
+# Update the container's resources.
+# Expects to receive a Update protobuf via stdin.
+def update():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+
+ update = containerizer_pb2.Update()
+ update.ParseFromString(data)
+
+ print >> sys.stderr, "Received " \
+ + str(len(update.resources)) \
+ + " resource elements."
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise Update protobuf."
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ except ValueError:
+ print >> sys.stderr, "Value is invalid"
+ return 1
+
+ return 0
+
+
+# Gather resource usage statistics for the containerized executor.
+# Delivers an ResourceStatistics protobut via stdout when
+# successful.
+def usage():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+ usage = containerizer_pb2.Usage()
+ usage.ParseFromString(data)
+
+ statistics = mesos_pb2.ResourceStatistics()
+
+ statistics.timestamp = time.time()
+
+ # Return hardcoded dummy statistics.
+ # TODO(tillt): Make use of mesos-usage here for capturing real
+ # statistics.
+ statistics.mem_rss_bytes = 1073741824
+ statistics.mem_limit_bytes = 1073741824
+ statistics.cpus_limit = 2
+ statistics.cpus_user_time_secs = 0.12
+ statistics.cpus_system_time_secs = 0.5
+
+ send(statistics.SerializeToString())
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+ return 1
+
+ except google.protobuf.message.EncodeError:
+ print >> sys.stderr, "Could not serialise ResourceStatistics protobuf."
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ return 0
+
+
+# Terminate the containerized executor.
+def destroy():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+ destroy = containerizer_pb2.Destroy()
+ destroy.ParseFromString(data)
+
+ lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+ destroy.container_id.value)
+ lock = os.path.join(lock_dir, "pid")
+
+ # Obtain our shared lock once it becomes available, read
+ # the pid and kill that process.
+ with open(lock, "r") as lk:
+ fcntl.flock(lk, fcntl.LOCK_SH)
+
+ pid = int(lk.read())
+
+ os.kill(pid, signal.SIGKILL)
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ return 0
+
+
+# Recover the containerized executor.
+# TODO(tillt): It is yet to be defined which data will be forwarded
+# for the external containerizer to be able to recover its internal
+# states. Currently this command is not being invoked.
+def recover():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+ containerId = mesos_pb2.ContainerID()
+ containerId.ParseFromString(data)
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ return 0
+
+
+# Get the containerized executor's Termination.
+# Delivers a Termination protobuf filled with the information
+# gathered from launch's wait via stdout.
+def wait():
+ try:
+ data = receive()
+ if len(data) == 0:
+ return 1
+ wait = containerizer_pb2.Wait()
+ wait.ParseFromString(data)
+
+ lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+ wait.container_id.value)
+ lock = os.path.join(lock_dir, "wait")
+
+ # Obtain our shared lock once it becomes available and read
+ # the status code.
+ with open(lock, "r") as lk:
+ fcntl.flock(lk, fcntl.LOCK_SH)
+ status = int(lk.read())
+
+ # Deliver the termination protobuf back to the slave.
+ termination = containerizer_pb2.Termination()
+ termination.killed = false
+ termination.status = status
+ termination.message = ""
+
+ send(termination.SerializeToString())
+
+ except google.protobuf.message.DecodeError:
+ print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+ return 1
+
+ except google.protobuf.message.EncodeError:
+ print >> sys.stderr, "Could not serialise Termination protobuf."
+ return 1
+
+ except OSError as e:
+ print >> sys.stderr, e.strerror
+ return 1
+
+ return 0
+
+
+if __name__ == "__main__":
+ methods = { "launch": launch,
+ "update": update,
+ "destroy": destroy,
+ "recover": recover,
+ "usage": usage,
+ "wait": wait }
+
+ if sys.argv[1:2] == ["--help"] or sys.argv[1:2] == ["-h"]:
+ print use(sys.argv[0], methods.keys())
+ sys.exit(0)
+
+ if len(sys.argv) < 2:
+ print >> sys.stderr, "Please pass a command"
+ print >> sys.stderr, use(sys.argv[0], methods.keys())
+ sys.exit(1)
+
+ command = sys.argv[1]
+ if command not in methods:
+ print >> sys.stderr, "Invalid command passed"
+ print >> sys.stderr, use(sys.argv[0], methods.keys())
+ sys.exit(2)
+
+ method = methods.get(command)
+
+ sys.exit(method())
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 344872a..9321bbd 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -38,6 +38,7 @@
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/external_containerizer.hpp"
#include "slave/containerizer/isolators/posix.hpp"
#ifdef __linux__
@@ -176,6 +177,10 @@ Try<Containerizer*> Containerizer::create(
LOG(INFO) << "Using isolation: " << isolation;
+ if (isolation == "external") {
+ return new ExternalContainerizer(flags);
+ }
+
// Create a MesosContainerizerProcess using isolators and a launcher.
hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 9a50fba..a9f89fc 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
#include <stout/option.hpp>
#include <stout/try.hpp>
+
namespace mesos {
namespace internal {
namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
new file mode 100644
index 0000000..e51ac66
--- /dev/null
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -0,0 +1,897 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <iomanip>
+#include <list>
+
+#include <errno.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include <process/async.hpp>
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/strings.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "slave/paths.hpp"
+
+#include "slave/containerizer/external_containerizer.hpp"
+
+
+using lambda::bind;
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using tuples::tuple;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::ExecutorState;
+using state::FrameworkState;
+using state::RunState;
+using state::SlaveState;
+
+
+// Validate the invocation result.
+static Option<Error> validate(
+ const Future<Option<int> >& future)
+{
+ if (!future.isReady()) {
+ return Error("Status not ready");
+ }
+
+ Option<int> status = future.get();
+ if (status.isNone()) {
+ return Error("External containerizer has no status available");
+ }
+
+ // The status is a waitpid-result which has to be checked for SIGNAL
+ // based termination before masking out the exit-code.
+ if (!WIFEXITED(status.get())) {
+ return Error(string("External containerizer terminated by signal ") +
+ strsignal(WTERMSIG(status.get())));
+ }
+
+ if (WEXITSTATUS(status.get()) != 0) {
+ return Error("External containerizer failed (status: " +
+ stringify(WEXITSTATUS(status.get())) + ")");
+ }
+
+ return None();
+}
+
+
+// Validate the invocation results and extract a piped protobuf
+// message.
+template<typename T>
+static Try<T> result(
+ const process::Future<tuples::tuple<
+ process::Future<Result<T> >,
+ process::Future<Option<int> > > >& future)
+{
+ if (!future.isReady()) {
+ return Error("Could not receive any result");
+ }
+
+ Option<Error> error = validate(tuples::get<1>(future.get()));
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ process::Future<Result<T> > result = tuples::get<0>(future.get());
+ if (result.isFailed()) {
+ return Error("Could not receive any result: " + result.failure());
+ }
+
+ if (result.get().isError()) {
+ return Error("Could not receive any result: " + result.get().error());
+ }
+
+ if (result.get().isNone()) {
+ return Error("Could not receive any result");
+ }
+
+ return result.get().get();
+}
+
+
+ExternalContainerizer::ExternalContainerizer(const Flags& flags)
+{
+ process = new ExternalContainerizerProcess(flags);
+ spawn(process);
+}
+
+
+ExternalContainerizer::~ExternalContainerizer()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Future<Nothing> ExternalContainerizer::recover(
+ const Option<state::SlaveState>& state)
+{
+ return dispatch(process, &ExternalContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> ExternalContainerizer::launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &ExternalContainerizerProcess::launch,
+ containerId,
+ None(),
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> ExternalContainerizer::launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &ExternalContainerizerProcess::launch,
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> ExternalContainerizer::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ return dispatch(process,
+ &ExternalContainerizerProcess::update,
+ containerId,
+ resources);
+}
+
+
+Future<ResourceStatistics> ExternalContainerizer::usage(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &ExternalContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> ExternalContainerizer::wait(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &ExternalContainerizerProcess::wait, containerId);
+}
+
+
+void ExternalContainerizer::destroy(const ContainerID& containerId)
+{
+ dispatch(process, &ExternalContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > ExternalContainerizer::containers()
+{
+ return dispatch(process, &ExternalContainerizerProcess::containers);
+}
+
+
+ExternalContainerizerProcess::ExternalContainerizerProcess(
+ const Flags& _flags) : flags(_flags) {}
+
+
+Future<Nothing> ExternalContainerizerProcess::recover(
+ const Option<state::SlaveState>& state)
+{
+ // TODO(tillt): Consider forwarding the recover command to the
+ // external containerizer. For now, recovery should be entirely
+ // covered by the slave itself.
+ return Nothing();
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::launch(
+ const ContainerID& containerId,
+ const Option<TaskInfo>& taskInfo,
+ const ExecutorInfo& executor,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ LOG(INFO) << "Launching container '" << containerId << "'";
+
+ if (actives.contains(containerId)) {
+ return Failure("Cannot start already running container '" +
+ containerId.value() + "'");
+ }
+
+ map<string, string> environment = executorEnvironment(
+ executor,
+ directory,
+ slaveId,
+ slavePid,
+ checkpoint,
+ flags.recovery_timeout);
+
+ // TODO(tillt): Consider moving this into
+ // Containerizer::executorEnvironment.
+ if (!flags.hadoop_home.empty()) {
+ environment["HADOOP_HOME"] = flags.hadoop_home;
+ }
+
+ if (flags.default_container_image.isSome()) {
+ environment["MESOS_DEFAULT_CONTAINER_IMAGE"] =
+ flags.default_container_image.get();
+ }
+
+ containerizer::Launch launch;
+ launch.mutable_container_id()->CopyFrom(containerId);
+ if (taskInfo.isSome()) {
+ launch.mutable_task_info()->CopyFrom(taskInfo.get());
+ } else {
+ launch.mutable_executor_info()->CopyFrom(executor);
+ }
+ launch.set_directory(directory);
+ if (user.isSome()) {
+ launch.set_user(user.get());
+ }
+ launch.mutable_slave_id()->CopyFrom(slaveId);
+ launch.set_slave_pid(slavePid);
+ launch.set_checkpoint(checkpoint);
+
+ Sandbox sandbox(directory, user);
+
+ Try<Subprocess> invoked = invoke(
+ "launch",
+ sandbox,
+ launch,
+ environment);
+
+ if (invoked.isError()) {
+ return Failure("Launch of container '" + containerId.value() +
+ "' failed: " + invoked.error());
+ }
+
+ // Record the container launch intend.
+ actives.put(containerId, Owned<Container>(new Container(sandbox)));
+
+ return invoked.get().status()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_launch,
+ containerId,
+ lambda::_1))
+ .onAny(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__launch,
+ containerId,
+ lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_launch(
+ const ContainerID& containerId,
+ const Future<Option<int> >& future)
+{
+ VLOG(1) << "Launch validation callback triggered on container '"
+ << containerId << "'";
+
+ Option<Error> error = validate(future);
+ if (error.isSome()) {
+ return Failure("Could not launch container '" +
+ containerId.value() + "': " + error.get().message);
+ }
+
+ VLOG(1) << "Launch finishing up for container '" << containerId << "'";
+
+ // Launch is done, we can now process all other commands that might
+ // have gotten chained up.
+ actives[containerId]->launched.set(Nothing());
+
+ return Nothing();
+}
+
+
+void ExternalContainerizerProcess::__launch(
+ const ContainerID& containerId,
+ const Future<Nothing>& future)
+{
+ VLOG(1) << "Launch confirmation callback triggered on container '"
+ << containerId << "'";
+
+ // We need to cleanup whenever this callback was invoked due to a
+ // failure or discarded future.
+ if (!future.isReady()) {
+ cleanup(containerId);
+ }
+}
+
+
+Future<containerizer::Termination> ExternalContainerizerProcess::wait(
+ const ContainerID& containerId)
+{
+ VLOG(1) << "Wait triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "' not running");
+ }
+
+ // Defer wait until launch is done.
+ return actives[containerId]->launched.future()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_wait,
+ containerId));
+}
+
+
+Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
+ const ContainerID& containerId)
+{
+ VLOG(1) << "Wait continuation triggered on container '"
+ << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "' not running");
+ }
+
+ containerizer::Wait wait;
+ wait.mutable_container_id()->CopyFrom(containerId);
+
+ Try<Subprocess> invoked = invoke(
+ "wait",
+ actives[containerId]->sandbox,
+ wait);
+
+ if (invoked.isError()) {
+ // 'wait' has failed, we need to tear down everything now.
+ unwait(containerId);
+ return Failure("Wait on container '" + containerId.value() +
+ "' failed: " + invoked.error());
+ }
+
+ actives[containerId]->pid = invoked.get().pid();
+
+ // Invoke the protobuf::read asynchronously.
+ // TODO(tillt): Consider moving protobuf::read into libprocess and
+ // making it work fully asynchronously.
+ Result<containerizer::Termination>(*read)(int, bool, bool) =
+ &::protobuf::read<containerizer::Termination>;
+
+ Future<Result<containerizer::Termination> > future = async(
+ read, invoked.get().out(), false, false);
+
+ // Await both, a protobuf Message from the subprocess as well as
+ // its exit.
+ await(future, invoked.get().status())
+ .onAny(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__wait,
+ containerId,
+ lambda::_1));
+
+ return actives[containerId]->termination.future();
+}
+
+
+void ExternalContainerizerProcess::__wait(
+ const ContainerID& containerId,
+ const Future<tuples::tuple<
+ Future<Result<containerizer::Termination> >,
+ Future<Option<int> > > >& future)
+{
+ VLOG(1) << "Wait callback triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ LOG(ERROR) << "Container '" << containerId << "' not running";
+ return;
+ }
+
+ Try<containerizer::Termination> termination =
+ result<containerizer::Termination>(future);
+
+ if (termination.isError()) {
+ // 'wait' has failed, we need to tear down everything now.
+ actives[containerId]->termination.fail(termination.error());
+ unwait(containerId);
+ } else {
+ // Set the promise to alert others waiting on this container.
+ actives[containerId]->termination.set(termination.get());
+ }
+
+ // The container has been waited on, we can safely cleanup now.
+ cleanup(containerId);
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ VLOG(1) << "Update triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "'' not running");
+ }
+
+ // Defer update until launch is done.
+ return actives[containerId]->launched.future()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_update,
+ containerId,
+ resources));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ VLOG(1) << "Update continuation triggered on container '"
+ << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "'' not running");
+ }
+
+ actives[containerId]->resources = resources;
+
+ containerizer::Update update;
+ update.mutable_container_id()->CopyFrom(containerId);
+ update.mutable_resources()->CopyFrom(resources);
+
+ Try<Subprocess> invoked = invoke(
+ "update",
+ actives[containerId]->sandbox,
+ update);
+
+ if (invoked.isError()) {
+ return Failure("Update of container '" + containerId.value() +
+ "' failed: " + invoked.error());
+ }
+
+ return invoked.get().status()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__update,
+ containerId,
+ lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::__update(
+ const ContainerID& containerId,
+ const Future<Option<int> >& future)
+{
+ VLOG(1) << "Update callback triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "' not running");
+ }
+
+ Option<Error> error = validate(future);
+ if (error.isSome()) {
+ return Failure(error.get());
+ }
+
+ return Nothing();
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::usage(
+ const ContainerID& containerId)
+{
+ VLOG(1) << "Usage triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "'' not running");
+ }
+
+ // Defer usage until launch is done.
+ return actives[containerId]->launched.future()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_usage,
+ containerId));
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
+ const ContainerID& containerId)
+{
+ VLOG(1) << "Usage continuation on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "'' not running");
+ }
+
+ containerizer::Usage usage;
+ usage.mutable_container_id()->CopyFrom(containerId);
+
+ Try<Subprocess> invoked = invoke(
+ "usage",
+ actives[containerId]->sandbox,
+ usage);
+
+ if (invoked.isError()) {
+ // 'usage' has failed but we keep the container alive for now.
+ return Failure("Usage on container '" + containerId.value() +
+ "' failed: " + invoked.error());
+ }
+
+ Result<ResourceStatistics>(*read)(int, bool, bool) =
+ &::protobuf::read<ResourceStatistics>;
+
+ Future<Result<ResourceStatistics> > future = async(
+ read, invoked.get().out(), false, false);
+
+ // Await both, a protobuf Message from the subprocess as well as
+ // its exit.
+ return await(future, invoked.get().status())
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__usage,
+ containerId,
+ lambda::_1));
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::__usage(
+ const ContainerID& containerId,
+ const Future<tuples::tuple<
+ Future<Result<ResourceStatistics> >,
+ Future<Option<int> > > >& future)
+{
+ VLOG(1) << "Usage callback triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ return Failure("Container '" + containerId.value() + "' not running");
+ }
+
+ Try<ResourceStatistics> statistics = result<ResourceStatistics>(future);
+
+ if (statistics.isError()) {
+ return Failure(statistics.error());
+ }
+
+ VLOG(2) << "Container '" << containerId << "' "
+ << "total mem usage "
+ << statistics.get().mem_rss_bytes() << " "
+ << "total CPU user usage "
+ << statistics.get().cpus_user_time_secs() << " "
+ << "total CPU system usage "
+ << statistics.get().cpus_system_time_secs();
+
+ return statistics.get();
+}
+
+
+void ExternalContainerizerProcess::destroy(const ContainerID& containerId)
+{
+ VLOG(1) << "Destroy triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ LOG(ERROR) << "Container '" << containerId << "' not running";
+ return;
+ }
+
+ // Defer destroy until launch is done.
+ actives[containerId]->launched.future()
+ .onAny(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_destroy,
+ containerId));
+}
+
+
+void ExternalContainerizerProcess::_destroy(const ContainerID& containerId)
+{
+ VLOG(1) << "Destroy continuation on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ LOG(ERROR) << "Container '" << containerId << "' not running";
+ return;
+ }
+
+ containerizer::Destroy destroy;
+ destroy.mutable_container_id()->CopyFrom(containerId);
+
+ Try<Subprocess> invoked = invoke(
+ "destroy",
+ actives[containerId]->sandbox,
+ destroy);
+
+ if (invoked.isError()) {
+ LOG(ERROR) << "Destroy of container '" << containerId
+ << "' failed: " << invoked.error();
+ unwait(containerId);
+ return;
+ }
+
+ invoked.get().status()
+ .onAny(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__destroy,
+ containerId,
+ lambda::_1));
+}
+
+
+void ExternalContainerizerProcess::__destroy(
+ const ContainerID& containerId,
+ const Future<Option<int> >& future)
+{
+ VLOG(1) << "Destroy callback triggered on container '" << containerId << "'";
+
+ if (!actives.contains(containerId)) {
+ LOG(ERROR) << "Container '" << containerId.value() << "' not running";
+ return;
+ }
+
+ Option<Error> error = validate(future);
+ if (error.isSome()) {
+ LOG(ERROR) << "Destroy of container '" << containerId
+ << "' failed: " << error.get().message;
+ }
+
+ // Additionally to the optional external destroy-command, we need to
+ // terminate the external containerizer's "wait" process.
+ unwait(containerId);
+}
+
+
+Future<hashset<ContainerID> > ExternalContainerizerProcess::containers()
+{
+ return actives.keys();
+}
+
+
+void ExternalContainerizerProcess::cleanup(const ContainerID& containerId)
+{
+ VLOG(1) << "Callback performing final cleanup of running state";
+
+ if (actives.contains(containerId)) {
+ actives.erase(containerId);
+ } else {
+ LOG(WARNING) << "Container '" << containerId << "' not running anymore";
+ }
+}
+
+
+void ExternalContainerizerProcess::unwait(const ContainerID& containerId)
+{
+ if (!actives.contains(containerId)) {
+ LOG(WARNING) << "Container '" << containerId << "' not running";
+ return;
+ }
+
+ Option<pid_t> pid = actives[containerId]->pid;
+
+ // Containers that are being waited on have the "wait" command's
+ // pid assigned.
+ if (pid.isNone()) {
+ // If we reached this, launch most likely failed due to some error
+ // on the external containerizer's side (e.g. returned non zero on
+ // launch).
+ LOG(WARNING) << "Container '" << containerId << "' not being waited on";
+ cleanup(containerId);
+ return;
+ }
+
+ // Terminate the containerizer.
+ VLOG(2) << "About to send a SIGKILL to containerizer pid: " << pid.get();
+
+ // TODO(tillt): Add graceful termination as soon as we have an
+ // accepted way to do that in place.
+ Try<list<os::ProcessTree> > trees =
+ os::killtree(pid.get(), SIGKILL, true, true);
+
+ if (trees.isError()) {
+ LOG(WARNING) << "Failed to kill the process tree rooted at pid "
+ << pid.get() << ": " << trees.error();
+ cleanup(containerId);
+ return;
+ }
+
+ LOG(INFO) << "Killed the following process tree/s:\n"
+ << stringify(trees.get());
+
+ // The cleanup function will get invoked via __wait which triggers
+ // once the external containerizer's "wait" subprocess gets
+ // terminated.
+}
+
+
+// Post fork, pre exec function.
+// TODO(tillt): Consider having the kernel notify us when our parent
+// process dies e.g. by invoking prctl(PR_SET_PDEATHSIG, ..) on linux.
+static int setup(const string& directory)
+{
+ // Put child into its own process session to prevent slave suicide
+ // on child process SIGKILL/SIGTERM.
+ if (::setsid() == -1) {
+ return errno;
+ }
+
+ // Re/establish the sandbox conditions for the containerizer.
+ if (::chdir(directory.c_str()) == -1) {
+ return errno;
+ }
+
+ // Sync parent and child process.
+ int sync = 0;
+ while (::write(STDOUT_FILENO, &sync, sizeof(sync)) == -1 &&
+ errno == EINTR);
+
+ return 0;
+}
+
+
+Try<process::Subprocess> ExternalContainerizerProcess::invoke(
+ const string& command,
+ const Sandbox& sandbox,
+ const google::protobuf::Message& message,
+ const map<string, string>& commandEnvironment)
+{
+ CHECK_SOME(flags.containerizer_path) << "containerizer_path not set";
+
+ VLOG(1) << "Invoking external containerizer for method '" << command << "'";
+
+ // Prepare a default environment.
+ map<string, string> environment;
+ environment["MESOS_LIBEXEC_DIRECTORY"] = flags.launcher_dir;
+
+ // Update default environment with command specific one.
+ environment.insert(commandEnvironment.begin(), commandEnvironment.end());
+
+ // Construct the command to execute.
+ string execute = flags.containerizer_path.get() + " " + command;
+
+ VLOG(2) << "calling: [" << execute << "]";
+ VLOG(2) << "directory: " << sandbox.directory;
+ VLOG_IF(sandbox.user.isSome(), 2) << "user: " << sandbox.user.get();
+
+ // Re/establish the sandbox conditions for the containerizer.
+ if (sandbox.user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ sandbox.user.get(),
+ sandbox.directory);
+ if (chown.isError()) {
+ return Error("Failed to chown work directory: " + chown.error());
+ }
+ }
+
+ // Fork exec of external process. Run a chdir and a setsid within
+ // the child-context.
+ Try<Subprocess> external = process::subprocess(
+ execute,
+ environment,
+ lambda::bind(&setup, sandbox.directory));
+
+ if (external.isError()) {
+ return Error("Failed to execute external containerizer: " +
+ external.error());
+ }
+
+ // Sync parent and child process to make sure we have done the
+ // setsid within the child context before continuing.
+ int sync;
+ while (::read(external.get().out(), &sync, sizeof(sync)) == -1 &&
+ errno == EINTR);
+
+ // Set stderr into non-blocking mode.
+ Try<Nothing> nonblock = os::nonblock(external.get().err());
+ if (nonblock.isError()) {
+ return Error("Failed to accept nonblock: " + nonblock.error());
+ }
+
+ // We are not setting stdin or stdout into non-blocking mode as
+ // protobuf::read / write do currently not support it.
+
+ // Redirect output (stderr) from the external containerizer to log
+ // file in the executor work directory, chown'ing it if a user is
+ // specified.
+ Try<int> err = os::open(
+ path::join(sandbox.directory, "stderr"),
+ O_WRONLY | O_CREAT | O_APPEND | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (err.isError()) {
+ return Error("Failed to redirect stderr: " + err.error());
+ }
+
+ if (sandbox.user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ sandbox.user.get(),
+ path::join(sandbox.directory, "stderr"));
+ if (chown.isError()) {
+ return Error("Failed to redirect stderr:" + chown.error());
+ }
+ }
+
+ io::splice(external.get().err(), err.get())
+ .onAny(bind(&os::close, err.get()));
+
+ // Transmit protobuf data via stdout towards the external
+ // containerizer. Each message is prefixed by its total size.
+ Try<Nothing> write = ::protobuf::write(external.get().in(), message);
+ if (write.isError()) {
+ return Error("Failed to write protobuf to pipe: " + write.error());
+ }
+
+ VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
+ << "output pipe: " << external.get().out();
+
+ return external;
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
new file mode 100644
index 0000000..eb3ff96
--- /dev/null
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __EXTERNAL_CONTAINERIZER_HPP__
+#define __EXTERNAL_CONTAINERIZER_HPP__
+
+#include <list>
+#include <sstream>
+#include <string>
+
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+#include <stout/tuple.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// The scheme an external containerizer has to adhere to is;
+//
+// COMMAND < INPUT-PROTO > RESULT-PROTO
+//
+// launch < containerizer::Launch
+// update < containerizer::Update
+// usage < containerizer::Usage > mesos::ResourceStatistics
+// wait < containerizer::Wait > containerizer::Termination
+// destroy < containerizer::Destroy
+//
+// 'wait' on the external containerizer side is expected to block
+// until the task command/executor has terminated.
+//
+
+// Check src/examples/python/test_containerizer.py for a rough
+// implementation template of this protocol.
+
+// TODO(tillt): Implement a protocol for external containerizer
+// recovery by defining needed protobuf/s.
+// Currently we expect to cover recovery entirely on the slave side.
+
+// For debugging purposes of an external containerizer, it might be
+// helpful to enable verbose logging on the slave (GLOG_v=2).
+
+class ExternalContainerizerProcess;
+
+class ExternalContainerizer : public Containerizer
+{
+public:
+ ExternalContainerizer(const Flags& flags);
+
+ virtual ~ExternalContainerizer();
+
+ virtual process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const TaskInfo& task,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ virtual void destroy(const ContainerID& containerId);
+
+ virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+ ExternalContainerizerProcess* process;
+};
+
+
+class ExternalContainerizerProcess
+ : public process::Process<ExternalContainerizerProcess>
+{
+public:
+ ExternalContainerizerProcess(const Flags& flags);
+
+ // Recover containerized executors as specified by state. See
+ // containerizer.hpp:recover for more.
+ process::Future<Nothing> recover(const Option<state::SlaveState>& state);
+
+ // Start the containerized executor.
+ process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const Option<TaskInfo>& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ // Update the container's resources.
+ process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ // Gather resource usage statistics for the containerized executor.
+ process::Future<ResourceStatistics> usage(const ContainerID& containerId);
+
+ // Get a future on the containerized executor's Termination.
+ process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ // Terminate the containerized executor.
+ void destroy(const ContainerID& containerId);
+
+ // Get all active container-id's.
+ process::Future<hashset<ContainerID> > containers();
+
+private:
+ // Startup flags.
+ const Flags flags;
+
+ // Information describing a container environment. A sandbox has to
+ // be prepared before the external containerizer can be invoked.
+ struct Sandbox
+ {
+ Sandbox(const std::string& directory, const Option<std::string>& user)
+ : directory(directory), user(user) {}
+
+ const std::string directory;
+ const Option<std::string> user;
+ };
+
+ // Information describing a running container.
+ struct Container
+ {
+ Container(const Sandbox& sandbox) : sandbox(sandbox), pid(None()) {}
+
+ // Keep sandbox information available for subsequent containerizer
+ // invocations.
+ Sandbox sandbox;
+
+ // External containerizer pid as per wait-invocation.
+ // Wait should block on the external containerizer side, hence we
+ // need to keep its pid for terminating if needed.
+ Option<pid_t> pid;
+
+ process::Promise<containerizer::Termination> termination;
+
+ // As described in MESOS-1251, we need to make sure that events
+ // that are triggered before launch has completed, are in fact
+ // queued until then to reduce complexity within external
+ // containerizer program implementations. To achieve that, we
+ // simply queue all events onto this promise.
+ process::Promise<Nothing> launched;
+
+ Resources resources;
+ };
+
+ // Stores all active containers.
+ hashmap<ContainerID, process::Owned<Container> > actives;
+
+ process::Future<Nothing> _launch(
+ const ContainerID& containerId,
+ const process::Future<Option<int> >& future);
+
+ void __launch(
+ const ContainerID& containerId,
+ const process::Future<Nothing>& future);
+
+ process::Future<containerizer::Termination> _wait(
+ const ContainerID& containerId);
+
+ void __wait(
+ const ContainerID& containerId,
+ const process::Future<tuples::tuple<
+ process::Future<Result<containerizer::Termination> >,
+ process::Future<Option<int> > > >& future);
+
+ process::Future<Nothing> _update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ process::Future<Nothing> __update(
+ const ContainerID& containerId,
+ const process::Future<Option<int> >& future);
+
+ process::Future<ResourceStatistics> _usage(
+ const ContainerID& containerId);
+
+ process::Future<ResourceStatistics> __usage(
+ const ContainerID& containerId,
+ const process::Future<tuples::tuple<
+ process::Future<Result<ResourceStatistics> >,
+ process::Future<Option<int> > > >& future);
+
+ void _destroy(const ContainerID& containerId);
+
+ void __destroy(
+ const ContainerID& containerId,
+ const process::Future<Option<int> >& future);
+
+ // Abort a possibly pending "wait" in the external containerizer
+ // process.
+ void unwait(const ContainerID& containerId);
+
+ // Call back for when the containerizer has terminated all processes
+ // in the container.
+ void cleanup(const ContainerID& containerId);
+
+ Try<process::Subprocess> invoke(
+ const std::string& command,
+ const Sandbox& sandbox,
+ const google::protobuf::Message& message,
+ const std::map<std::string, std::string>& environment =
+ (std::map<std::string, std::string>())); // Wrapped in parens due to: http://llvm.org/bugs/show_bug.cgi?id=13657
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __EXTERNAL_CONTAINERIZER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index f9cc8e6..27813af 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -157,6 +157,7 @@ Future<Nothing> MesosContainerizer::launch(
checkpoint);
}
+
Future<Nothing> MesosContainerizer::update(
const ContainerID& containerId,
const Resources& resources)
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index f38de51..0a04ad5 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -57,7 +57,7 @@ public:
add(&Flags::isolation,
"isolation",
"Isolation mechanisms to use, e.g., 'posix/cpu,posix/mem'\n"
- "or 'cgroups/cpu,cgroups/mem'.",
+ "or 'cgroups/cpu,cgroups/mem' or 'external'.",
"posix/cpu,posix/mem");
add(&Flags::default_role,
@@ -70,8 +70,8 @@ public:
"*");
add(&Flags::attributes,
- "attributes",
- "Attributes of machine");
+ "attributes",
+ "Attributes of machine");
add(&Flags::work_dir,
"work_dir",
@@ -210,6 +210,16 @@ public:
"Path to a file containing a single line with\n"
"the 'principal' and 'secret' separated by whitespace.\n"
"Path could be of the form 'file:///path/to/file' or '/path/to/file'");
+
+ add(&Flags::containerizer_path,
+ "containerizer_path",
+ "The path to the external containerizer executable used when\n"
+ "external isolation is activated (--isolation=external).\n");
+
+ add(&Flags::default_container_image,
+ "default_container_image",
+ "The default container image to use if not specified by a task,\n"
+ "when using external containerizer");
}
bool version;
@@ -241,6 +251,8 @@ public:
Option<std::string> slave_subsystems;
#endif
Option<std::string> credential;
+ Option<std::string> containerizer_path;
+ Option<std::string> default_container_image;
};
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/tests/external_containerizer_test.cpp
----------------------------------------------------------------------
diff --git a/src/tests/external_containerizer_test.cpp b/src/tests/external_containerizer_test.cpp
new file mode 100644
index 0000000..6f7b13e
--- /dev/null
+++ b/src/tests/external_containerizer_test.cpp
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <gmock/gmock.h>
+
+#include <string>
+#include <vector>
+#include <map>
+
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+
+#include "master/master.hpp"
+#include "master/detector.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/external_containerizer.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+#include "tests/flags.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using namespace process;
+
+using mesos::internal::master::Master;
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::Invoke;
+
+// TODO(tillt): Update and enhance the ExternalContainerizer tests,
+// possibly following some of the patterns used within the
+// IsolatorTests.
+class ExternalContainerizerTest : public MesosTest {};
+
+
+class MockExternalContainerizer : public slave::ExternalContainerizer
+{
+public:
+ MOCK_METHOD8(
+ launch,
+ process::Future<Nothing>(
+ const ContainerID&,
+ const TaskInfo&,
+ const ExecutorInfo&,
+ const std::string&,
+ const Option<std::string>&,
+ const SlaveID&,
+ const process::PID<slave::Slave>&,
+ bool checkpoint));
+
+ MockExternalContainerizer(const slave::Flags& flags)
+ : ExternalContainerizer(flags)
+ {
+ // Set up defaults for mocked methods.
+ // NOTE: See TestContainerizer::setup for why we use
+ // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+ // 'ON_CALL' and 'WillByDefault'.
+ EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockExternalContainerizer::_launch));
+ }
+
+ process::Future<Nothing> _launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+ {
+ return slave::ExternalContainerizer::launch(
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+ }
+};
+
+
+TEST_F(ExternalContainerizerTest, Launch)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ Flags testFlags;
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ flags.isolation = "external";
+ flags.containerizer_path =
+ testFlags.build_dir + "/src/examples/python/test-containerizer";
+
+ MockExternalContainerizer containerizer(flags);
+
+ Try<PID<Slave> > slave = this->StartSlave(&containerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(offers);
+
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("isolator_test");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+
+ Resources resources(offers.get()[0].resources());
+ Option<Bytes> mem = resources.mem();
+ ASSERT_SOME(mem);
+ Option<double> cpus = resources.cpus();
+ ASSERT_SOME(cpus);
+
+ const std::string& file = path::join(flags.work_dir, "ready");
+
+ // This task induces user/system load in a child process by
+ // running top in a child process for ten seconds.
+ task.mutable_command()->set_value(
+#ifdef __APPLE__
+ // Use logging mode with 30,000 samples with no interval.
+ "top -l 30000 -s 0 2>&1 > /dev/null & "
+#else
+ // Batch mode, with 30,000 samples with no interval.
+ "top -b -d 0 -n 30000 2>&1 > /dev/null & "
+#endif
+ "touch " + file + "; " // Signals that the top command is running.
+ "sleep 60");
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore rest for now
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&containerizer,
+ &MockExternalContainerizer::_launch)));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(containerId);
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Wait for the task to begin inducing cpu time.
+ while (!os::exists(file));
+
+ ExecutorID executorId;
+ executorId.set_value(task.task_id().value());
+
+ // We'll wait up to 10 seconds for the child process to induce
+ // 1/8 of a second of user and system cpu time in total.
+ // TODO(bmahler): Also induce rss memory consumption, by re-using
+ // the balloon framework.
+ ResourceStatistics statistics;
+ Duration waited = Duration::zero();
+ do {
+ Future<ResourceStatistics> usage = containerizer.usage(containerId.get());
+ AWAIT_READY(usage);
+
+ statistics = usage.get();
+
+ // If we meet our usage expectations, we're done!
+ // NOTE: We are currently getting dummy-data from the test-
+ // containerizer python script matching these expectations.
+ // TODO(tillt): Consider working with real data.
+ if (statistics.cpus_user_time_secs() >= 0.120 &&
+ statistics.cpus_system_time_secs() >= 0.05 &&
+ statistics.mem_rss_bytes() >= 1024u) {
+ break;
+ }
+
+ os::sleep(Milliseconds(100));
+ waited += Milliseconds(100);
+ } while (waited < Seconds(10));
+
+ EXPECT_GE(statistics.cpus_user_time_secs(), 0.120);
+ EXPECT_GE(statistics.cpus_system_time_secs(), 0.05);
+ EXPECT_EQ(statistics.cpus_limit(), cpus.get());
+ EXPECT_GE(statistics.mem_rss_bytes(), 1024u);
+ EXPECT_EQ(statistics.mem_limit_bytes(), mem.get().bytes());
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_KILLED, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown();
+}