You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/04/28 09:07:44 UTC

git commit: Added External Containerizer.

Repository: mesos
Updated Branches:
  refs/heads/master f6b838a0b -> bd6a0dadd


Added External Containerizer.

This patch adds the so-called external containerizer. This
containerizer delegates all containerizer calls directly to
an external containerizer program (which can be specified on
start-up).

The protocol for the interactions with the external program
is as follows:

COMMAND < INPUT-PROTO > RESULT-PROTO

launch < Launch
update < Update
usage < Usage > ResourceStatistics
wait < Wait > Termination
destroy < Destroy

When protocol buffers need to be provided, the Mesos side of
the external containerizer implementation will serialize the
protos on stdin and vice-versa for reading protos on stdout as
drafted in the above scheme.

Review: https://reviews.apache.org/r/17567


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd6a0dad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd6a0dad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd6a0dad

Branch: refs/heads/master
Commit: bd6a0dadd84b6c996d49baa46a3e8cb0bb68251d
Parents: f6b838a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Apr 27 20:38:06 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Apr 28 00:06:39 2014 -0700

----------------------------------------------------------------------
 configure.ac                                    |   2 +
 src/Makefile.am                                 |   8 +-
 src/examples/python/test-containerizer.in       |  47 +
 src/examples/python/test_containerizer.py       | 353 ++++++++
 src/slave/containerizer/containerizer.cpp       |   5 +
 src/slave/containerizer/containerizer.hpp       |   1 +
 .../containerizer/external_containerizer.cpp    | 897 +++++++++++++++++++
 .../containerizer/external_containerizer.hpp    | 260 ++++++
 src/slave/containerizer/mesos_containerizer.cpp |   1 +
 src/slave/flags.hpp                             |  18 +-
 src/tests/external_containerizer_test.cpp       | 257 ++++++
 11 files changed, 1844 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 01182b3..4967fe4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -691,6 +691,8 @@ There are two possible workarounds for this issue:
                   [chmod +x src/examples/python/test-executor])
   AC_CONFIG_FILES([src/examples/python/test-framework],
                   [chmod +x src/examples/python/test-framework])
+  AC_CONFIG_FILES([src/examples/python/test-containerizer],
+                  [chmod +x src/examples/python/test-containerizer])
   AC_CONFIG_FILES([src/python/setup.py])
 
   # When clang is being used, make sure that the distutils python-

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index c2029c7..d2e006d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -212,6 +212,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/containerizer/isolator.cpp				\
 	slave/containerizer/launcher.cpp				\
 	slave/containerizer/mesos_containerizer.cpp			\
+	slave/containerizer/external_containerizer.cpp			\
 	slave/status_update_manager.cpp					\
 	exec/exec.cpp							\
 	common/lock.cpp							\
@@ -290,6 +291,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	slave/containerizer/isolators/posix.hpp				\
 	slave/containerizer/launcher.hpp				\
 	slave/containerizer/mesos_containerizer.hpp			\
+	slave/containerizer/external_containerizer.hpp			\
 	slave/flags.hpp slave/gc.hpp slave/monitor.hpp			\
 	slave/paths.hpp slave/state.hpp					\
 	slave/status_update_manager.hpp					\
@@ -889,6 +891,7 @@ mesos_tests_SOURCES =				\
   tests/flags.cpp				\
   tests/gc_tests.cpp				\
   tests/isolator_tests.cpp			\
+  tests/external_containerizer_test.cpp		\
   tests/log_tests.cpp				\
   tests/logging_tests.cpp			\
   tests/main.cpp				\
@@ -956,8 +959,9 @@ if HAS_PYTHON
   EXAMPLESCRIPTSPYTHON = examples/python/test_framework.py		\
 			 examples/python/test-framework			\
 			 examples/python/test_executor.py		\
-			 examples/python/test-executor
-
+			 examples/python/test-executor			\
+			 examples/python/test_containerizer.py		\
+			 examples/python/test-containerizer
   check_SCRIPTS += $(EXAMPLESCRIPTSPYTHON)
   mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSPYTHON)
 endif

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/examples/python/test-containerizer.in
----------------------------------------------------------------------
diff --git a/src/examples/python/test-containerizer.in b/src/examples/python/test-containerizer.in
new file mode 100644
index 0000000..569519b
--- /dev/null
+++ b/src/examples/python/test-containerizer.in
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+# Force the use of the Python interpreter configured during building.
+test ! -z "${PYTHON}" && \
+  echo "${RED}Ignoring PYTHON environment variable (using @PYTHON@)${NORMAL}"
+
+PYTHON=@PYTHON@
+
+DISTRIBUTE_EGG=${MESOS_BUILD_DIR}/3rdparty/distribute-0.6.26/dist/
+DISTRIBUTE_EGG+=distribute-0.6.26@PYTHON_EGG_PUREPY_POSTFIX@.egg
+
+test ! -e ${DISTRIBUTE_EGG} && \
+  echo "${RED}Failed to find ${DISTRIBUTE_EGG}${NORMAL}" && \
+  exit 1
+
+PROTOBUF=${MESOS_BUILD_DIR}/3rdparty/libprocess/3rdparty/protobuf-2.5.0
+
+PROTOBUF_EGG=${PROTOBUF}/python/dist/
+PROTOBUF_EGG+=protobuf-2.5.0@PYTHON_EGG_PUREPY_POSTFIX@.egg
+
+test ! -e ${PROTOBUF_EGG} && \
+  echo "${RED}Failed to find ${PROTOBUF_EGG}${NORMAL}" && \
+  exit 1
+
+MESOS_EGG=${MESOS_BUILD_DIR}/src/python/dist/
+MESOS_EGG+=mesos-@PACKAGE_VERSION@@PYTHON_EGG_POSTFIX@.egg
+
+test ! -e ${MESOS_EGG} && \
+  echo "${RED}Failed to find ${MESOS_EGG}${NORMAL}" && \
+  exit 1
+
+SCRIPT=${MESOS_SOURCE_DIR}/src/examples/python/test_containerizer.py
+
+test ! -e ${SCRIPT} && \
+  echo "${RED}Failed to find ${SCRIPT}${NORMAL}" && \
+  exit 1
+
+PYTHONPATH="${DISTRIBUTE_EGG}:${MESOS_EGG}:${PROTOBUF_EGG}" \
+  exec ${PYTHON} ${SCRIPT} "${@}"

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/examples/python/test_containerizer.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_containerizer.py b/src/examples/python/test_containerizer.py
new file mode 100644
index 0000000..57b4bb6
--- /dev/null
+++ b/src/examples/python/test_containerizer.py
@@ -0,0 +1,353 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The scheme an external containerizer has to adhere to is;
+#
+# COMMAND < INPUT-PROTO > RESULT-PROTO
+#
+# launch < Launch
+# update < Update
+# usage < Usage > ResourceStatistics
+# wait < Wait > Termination
+# destroy < Destroy
+#
+# 'wait' is expected to block until the task command/executor has
+# terminated.
+
+# TODO(tillt): Implement a protocol for external containerizer
+# recovery by defining needed protobuf/s.
+# Currently we exepect to cover recovery entirely on the slave side.
+
+import fcntl
+import multiprocessing
+import os
+import signal
+import subprocess
+import sys
+import struct
+import time
+import google
+import mesos
+import mesos_pb2
+import containerizer_pb2
+
+
+# Render a string describing how to use this script.
+def use(argv0, methods):
+    out = "Usage: %s <command>\n" % argv0
+    out += "Valid commands: " + ', '.join(methods)
+
+    return out
+
+
+# Read a data chunk prefixed by its total size from stdin.
+def receive():
+    # Read size (uint32 => 4 bytes).
+    size = struct.unpack('I', sys.stdin.read(4))
+    if size[0] <= 0:
+        print >> sys.stderr, "Expected protobuf size over stdin. " \
+                             "Received 0 bytes."
+        return ""
+
+    # Read payload.
+    data = sys.stdin.read(size[0])
+    if len(data) != size[0]:
+        print >> sys.stderr, "Expected %d bytes protobuf over stdin. " \
+                             "Received %d bytes." % (size[0], len(data))
+        return ""
+
+    return data
+
+
+# Write a protobuf message prefixed by its total size (aka recordio)
+# to stdout.
+def send(data):
+    # Write size (uint32 => 4 bytes).
+    sys.stdout.write(struct.pack('I', len(data)))
+
+    # Write payload.
+    sys.stdout.write(data)
+
+
+# Start a containerized executor. Expects to receive an Launch
+# protobuf via stdin.
+def launch():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+
+        launch = containerizer_pb2.Launch()
+        launch.ParseFromString(data)
+
+        if launch.task_info.HasField("executor"):
+            command = ["sh",
+                       "-c",
+                       launch.task_info.executor.command.value]
+        else:
+            print >> sys.stderr, "No executor passed; using mesos-executor!"
+            executor = os.path.join(os.environ['MESOS_LIBEXEC_DIRECTORY'],
+                                    "mesos-executor")
+            command = [executor,
+                       "sh",
+                       "-c",
+                       launch.task_info.command.value]
+            print >> sys.stderr, "command " + str(command)
+
+        lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+                                launch.container_id.value)
+        subprocess.check_call(["mkdir", "-p", lock_dir])
+
+        # Fork a child process for allowing a blocking wait.
+        pid = os.fork()
+        if pid == 0:
+            # We are in the child.
+            proc = subprocess.Popen(command, env=os.environ.copy())
+
+            # Wait and serialize the process status when done.
+            lock = os.path.join(lock_dir, "wait")
+            with open(lock, "w+") as lk:
+                fcntl.flock(lk, fcntl.LOCK_EX)
+
+                status = proc.wait()
+
+                lk.write(str(status) + "\n")
+
+            sys.exit(status)
+        else:
+            # We are in the parent.
+
+            # Serialize the subprocess pid.
+            lock = os.path.join(lock_dir, "pid")
+            with open(lock, "w+") as lk:
+                fcntl.flock(lk, fcntl.LOCK_EX)
+
+                lk.write(str(pid) + "\n")
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise Launch protobuf"
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    except ValueError:
+        print >> sys.stderr, "Value is invalid"
+        return 1
+
+    return 0
+
+
+# Update the container's resources.
+# Expects to receive a Update protobuf via stdin.
+def update():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+
+        update = containerizer_pb2.Update()
+        update.ParseFromString(data)
+
+        print >> sys.stderr, "Received "                \
+                           + str(len(update.resources)) \
+                           + " resource elements."
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise Update protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    except ValueError:
+        print >> sys.stderr, "Value is invalid"
+        return 1
+
+    return 0
+
+
+# Gather resource usage statistics for the containerized executor.
+# Delivers an ResourceStatistics protobut via stdout when
+# successful.
+def usage():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+        usage = containerizer_pb2.Usage()
+        usage.ParseFromString(data)
+
+        statistics = mesos_pb2.ResourceStatistics()
+
+        statistics.timestamp = time.time()
+
+        # Return hardcoded dummy statistics.
+        # TODO(tillt): Make use of mesos-usage here for capturing real
+        # statistics.
+        statistics.mem_rss_bytes = 1073741824
+        statistics.mem_limit_bytes = 1073741824
+        statistics.cpus_limit = 2
+        statistics.cpus_user_time_secs = 0.12
+        statistics.cpus_system_time_secs = 0.5
+
+        send(statistics.SerializeToString())
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        return 1
+
+    except google.protobuf.message.EncodeError:
+        print >> sys.stderr, "Could not serialise ResourceStatistics protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    return 0
+
+
+# Terminate the containerized executor.
+def destroy():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+        destroy = containerizer_pb2.Destroy()
+        destroy.ParseFromString(data)
+
+        lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+                                destroy.container_id.value)
+        lock = os.path.join(lock_dir, "pid")
+
+        # Obtain our shared lock once it becomes available, read
+        # the pid and kill that process.
+        with open(lock, "r") as lk:
+            fcntl.flock(lk, fcntl.LOCK_SH)
+
+            pid = int(lk.read())
+
+            os.kill(pid, signal.SIGKILL)
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    return 0
+
+
+# Recover the containerized executor.
+# TODO(tillt): It is yet to be defined which data will be forwarded
+# for the external containerizer to be able to recover its internal
+# states. Currently this command is not being invoked.
+def recover():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+        containerId = mesos_pb2.ContainerID()
+        containerId.ParseFromString(data)
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    return 0
+
+
+# Get the containerized executor's Termination.
+# Delivers a Termination protobuf filled with the information
+# gathered from launch's wait via stdout.
+def wait():
+    try:
+        data = receive()
+        if len(data) == 0:
+            return 1
+        wait = containerizer_pb2.Wait()
+        wait.ParseFromString(data)
+
+        lock_dir = os.path.join("/tmp/mesos-test-containerizer",
+                                wait.container_id.value)
+        lock = os.path.join(lock_dir, "wait")
+
+        # Obtain our shared lock once it becomes available and read
+        # the status code.
+        with open(lock, "r") as lk:
+            fcntl.flock(lk, fcntl.LOCK_SH)
+            status = int(lk.read())
+
+        # Deliver the termination protobuf back to the slave.
+        termination = containerizer_pb2.Termination()
+        termination.killed = false
+        termination.status = status
+        termination.message = ""
+
+        send(termination.SerializeToString())
+
+    except google.protobuf.message.DecodeError:
+        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
+        return 1
+
+    except google.protobuf.message.EncodeError:
+        print >> sys.stderr, "Could not serialise Termination protobuf."
+        return 1
+
+    except OSError as e:
+        print >> sys.stderr, e.strerror
+        return 1
+
+    return 0
+
+
+if __name__ == "__main__":
+    methods = { "launch":  launch,
+                "update":  update,
+                "destroy": destroy,
+                "recover": recover,
+                "usage":   usage,
+                "wait":    wait }
+
+    if sys.argv[1:2] == ["--help"] or sys.argv[1:2] == ["-h"]:
+        print use(sys.argv[0], methods.keys())
+        sys.exit(0)
+
+    if len(sys.argv) < 2:
+        print >> sys.stderr, "Please pass a command"
+        print >> sys.stderr, use(sys.argv[0], methods.keys())
+        sys.exit(1)
+
+    command = sys.argv[1]
+    if command not in methods:
+        print >> sys.stderr, "Invalid command passed"
+        print >> sys.stderr, use(sys.argv[0], methods.keys())
+        sys.exit(2)
+
+    method = methods.get(command)
+
+    sys.exit(method())

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 344872a..9321bbd 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -38,6 +38,7 @@
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
 #include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/external_containerizer.hpp"
 
 #include "slave/containerizer/isolators/posix.hpp"
 #ifdef __linux__
@@ -176,6 +177,10 @@ Try<Containerizer*> Containerizer::create(
 
   LOG(INFO) << "Using isolation: " << isolation;
 
+  if (isolation == "external") {
+    return new ExternalContainerizer(flags);
+  }
+
   // Create a MesosContainerizerProcess using isolators and a launcher.
   hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 9a50fba..a9f89fc 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
+
 namespace mesos {
 namespace internal {
 namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
new file mode 100644
index 0000000..e51ac66
--- /dev/null
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -0,0 +1,897 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <iomanip>
+#include <list>
+
+#include <errno.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include <process/async.hpp>
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/strings.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "slave/paths.hpp"
+
+#include "slave/containerizer/external_containerizer.hpp"
+
+
+using lambda::bind;
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using tuples::tuple;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::ExecutorState;
+using state::FrameworkState;
+using state::RunState;
+using state::SlaveState;
+
+
+// Validate the invocation result.
+static Option<Error> validate(
+    const Future<Option<int> >& future)
+{
+  if (!future.isReady()) {
+    return Error("Status not ready");
+  }
+
+  Option<int> status = future.get();
+  if (status.isNone()) {
+    return Error("External containerizer has no status available");
+  }
+
+  // The status is a waitpid-result which has to be checked for SIGNAL
+  // based termination before masking out the exit-code.
+  if (!WIFEXITED(status.get())) {
+    return Error(string("External containerizer terminated by signal ") +
+                 strsignal(WTERMSIG(status.get())));
+  }
+
+  if (WEXITSTATUS(status.get()) != 0) {
+    return Error("External containerizer failed (status: " +
+                 stringify(WEXITSTATUS(status.get())) + ")");
+  }
+
+  return None();
+}
+
+
+// Validate the invocation results and extract a piped protobuf
+// message.
+template<typename T>
+static Try<T> result(
+    const process::Future<tuples::tuple<
+        process::Future<Result<T> >,
+        process::Future<Option<int> > > >& future)
+{
+  if (!future.isReady()) {
+    return Error("Could not receive any result");
+  }
+
+  Option<Error> error = validate(tuples::get<1>(future.get()));
+  if (error.isSome()) {
+    return error.get();
+  }
+
+  process::Future<Result<T> > result = tuples::get<0>(future.get());
+  if (result.isFailed()) {
+    return Error("Could not receive any result: " + result.failure());
+  }
+
+  if (result.get().isError()) {
+    return Error("Could not receive any result: " + result.get().error());
+  }
+
+  if (result.get().isNone()) {
+    return Error("Could not receive any result");
+  }
+
+  return result.get().get();
+}
+
+
+ExternalContainerizer::ExternalContainerizer(const Flags& flags)
+{
+  process = new ExternalContainerizerProcess(flags);
+  spawn(process);
+}
+
+
+ExternalContainerizer::~ExternalContainerizer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Nothing> ExternalContainerizer::recover(
+    const Option<state::SlaveState>& state)
+{
+  return dispatch(process, &ExternalContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> ExternalContainerizer::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ExternalContainerizerProcess::launch,
+                  containerId,
+                  None(),
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> ExternalContainerizer::launch(
+    const ContainerID& containerId,
+    const TaskInfo& taskInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  return dispatch(process,
+                  &ExternalContainerizerProcess::launch,
+                  containerId,
+                  taskInfo,
+                  executorInfo,
+                  directory,
+                  user,
+                  slaveId,
+                  slavePid,
+                  checkpoint);
+}
+
+
+Future<Nothing> ExternalContainerizer::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  return dispatch(process,
+                  &ExternalContainerizerProcess::update,
+                  containerId,
+                  resources);
+}
+
+
+Future<ResourceStatistics> ExternalContainerizer::usage(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ExternalContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> ExternalContainerizer::wait(
+    const ContainerID& containerId)
+{
+  return dispatch(process, &ExternalContainerizerProcess::wait, containerId);
+}
+
+
+void ExternalContainerizer::destroy(const ContainerID& containerId)
+{
+  dispatch(process, &ExternalContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > ExternalContainerizer::containers()
+{
+  return dispatch(process, &ExternalContainerizerProcess::containers);
+}
+
+
+ExternalContainerizerProcess::ExternalContainerizerProcess(
+    const Flags& _flags) : flags(_flags) {}
+
+
+Future<Nothing> ExternalContainerizerProcess::recover(
+    const Option<state::SlaveState>& state)
+{
+  // TODO(tillt): Consider forwarding the recover command to the
+  // external containerizer. For now, recovery should be entirely
+  // covered by the slave itself.
+  return Nothing();
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const Option<TaskInfo>& taskInfo,
+    const ExecutorInfo& executor,
+    const std::string& directory,
+    const Option<std::string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  LOG(INFO) << "Launching container '" << containerId << "'";
+
+  if (actives.contains(containerId)) {
+    return Failure("Cannot start already running container '" +
+                   containerId.value() + "'");
+  }
+
+  map<string, string> environment = executorEnvironment(
+      executor,
+      directory,
+      slaveId,
+      slavePid,
+      checkpoint,
+      flags.recovery_timeout);
+
+  // TODO(tillt): Consider moving this into
+  // Containerizer::executorEnvironment.
+  if (!flags.hadoop_home.empty()) {
+    environment["HADOOP_HOME"] = flags.hadoop_home;
+  }
+
+  if (flags.default_container_image.isSome()) {
+    environment["MESOS_DEFAULT_CONTAINER_IMAGE"] =
+      flags.default_container_image.get();
+  }
+
+  containerizer::Launch launch;
+  launch.mutable_container_id()->CopyFrom(containerId);
+  if (taskInfo.isSome()) {
+    launch.mutable_task_info()->CopyFrom(taskInfo.get());
+  } else {
+    launch.mutable_executor_info()->CopyFrom(executor);
+  }
+  launch.set_directory(directory);
+  if (user.isSome()) {
+    launch.set_user(user.get());
+  }
+  launch.mutable_slave_id()->CopyFrom(slaveId);
+  launch.set_slave_pid(slavePid);
+  launch.set_checkpoint(checkpoint);
+
+  Sandbox sandbox(directory, user);
+
+  Try<Subprocess> invoked = invoke(
+      "launch",
+      sandbox,
+      launch,
+      environment);
+
+  if (invoked.isError()) {
+    return Failure("Launch of container '" + containerId.value() +
+                   "' failed: " + invoked.error());
+  }
+
+  // Record the container launch intend.
+  actives.put(containerId, Owned<Container>(new Container(sandbox)));
+
+  return invoked.get().status()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_launch,
+        containerId,
+        lambda::_1))
+    .onAny(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__launch,
+        containerId,
+        lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_launch(
+    const ContainerID& containerId,
+    const Future<Option<int> >& future)
+{
+  VLOG(1) << "Launch validation callback triggered on container '"
+          << containerId << "'";
+
+  Option<Error> error = validate(future);
+  if (error.isSome()) {
+    return Failure("Could not launch container '" +
+                   containerId.value() + "': " + error.get().message);
+  }
+
+  VLOG(1) << "Launch finishing up for container '" << containerId << "'";
+
+  // Launch is done, we can now process all other commands that might
+  // have gotten chained up.
+  actives[containerId]->launched.set(Nothing());
+
+  return Nothing();
+}
+
+
+void ExternalContainerizerProcess::__launch(
+    const ContainerID& containerId,
+    const Future<Nothing>& future)
+{
+  VLOG(1) << "Launch confirmation callback triggered on container '"
+          << containerId << "'";
+
+  // We need to cleanup whenever this callback was invoked due to a
+  // failure or discarded future.
+  if (!future.isReady()) {
+    cleanup(containerId);
+  }
+}
+
+
+Future<containerizer::Termination> ExternalContainerizerProcess::wait(
+    const ContainerID& containerId)
+{
+  VLOG(1) << "Wait triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not running");
+  }
+
+  // Defer wait until launch is done.
+  return actives[containerId]->launched.future()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_wait,
+        containerId));
+}
+
+
+Future<containerizer::Termination> ExternalContainerizerProcess::_wait(
+    const ContainerID& containerId)
+{
+  VLOG(1) << "Wait continuation triggered on container '"
+          << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not running");
+  }
+
+  containerizer::Wait wait;
+  wait.mutable_container_id()->CopyFrom(containerId);
+
+  Try<Subprocess> invoked = invoke(
+      "wait",
+      actives[containerId]->sandbox,
+      wait);
+
+  if (invoked.isError()) {
+    // 'wait' has failed, we need to tear down everything now.
+    unwait(containerId);
+    return Failure("Wait on container '" + containerId.value() +
+                   "' failed: " + invoked.error());
+  }
+
+  actives[containerId]->pid = invoked.get().pid();
+
+  // Invoke the protobuf::read asynchronously.
+  // TODO(tillt): Consider moving protobuf::read into libprocess and
+  // making it work fully asynchronously.
+  Result<containerizer::Termination>(*read)(int, bool, bool) =
+    &::protobuf::read<containerizer::Termination>;
+
+  Future<Result<containerizer::Termination> > future = async(
+      read, invoked.get().out(), false, false);
+
+  // Await both, a protobuf Message from the subprocess as well as
+  // its exit.
+  await(future, invoked.get().status())
+    .onAny(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__wait,
+        containerId,
+        lambda::_1));
+
+  return actives[containerId]->termination.future();
+}
+
+
+void ExternalContainerizerProcess::__wait(
+    const ContainerID& containerId,
+    const Future<tuples::tuple<
+        Future<Result<containerizer::Termination> >,
+        Future<Option<int> > > >& future)
+{
+  VLOG(1) << "Wait callback triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    LOG(ERROR) << "Container '" << containerId << "' not running";
+    return;
+  }
+
+  Try<containerizer::Termination> termination =
+    result<containerizer::Termination>(future);
+
+  if (termination.isError()) {
+    // 'wait' has failed, we need to tear down everything now.
+    actives[containerId]->termination.fail(termination.error());
+    unwait(containerId);
+  } else {
+    // Set the promise to alert others waiting on this container.
+    actives[containerId]->termination.set(termination.get());
+  }
+
+  // The container has been waited on, we can safely cleanup now.
+  cleanup(containerId);
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  VLOG(1) << "Update triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "'' not running");
+  }
+
+  // Defer update until launch is done.
+  return actives[containerId]->launched.future()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_update,
+        containerId,
+        resources));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_update(
+    const ContainerID& containerId,
+    const Resources& resources)
+{
+  VLOG(1) << "Update continuation triggered on container '"
+          << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "'' not running");
+  }
+
+  actives[containerId]->resources = resources;
+
+  containerizer::Update update;
+  update.mutable_container_id()->CopyFrom(containerId);
+  update.mutable_resources()->CopyFrom(resources);
+
+  Try<Subprocess> invoked = invoke(
+      "update",
+      actives[containerId]->sandbox,
+      update);
+
+  if (invoked.isError()) {
+    return Failure("Update of container '" + containerId.value() +
+                   "' failed: " + invoked.error());
+  }
+
+  return invoked.get().status()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__update,
+        containerId,
+        lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::__update(
+    const ContainerID& containerId,
+    const Future<Option<int> >& future)
+{
+  VLOG(1) << "Update callback triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not running");
+  }
+
+  Option<Error> error = validate(future);
+  if (error.isSome()) {
+    return Failure(error.get());
+  }
+
+  return Nothing();
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::usage(
+    const ContainerID& containerId)
+{
+  VLOG(1) << "Usage triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "'' not running");
+  }
+
+  // Defer usage until launch is done.
+  return actives[containerId]->launched.future()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_usage,
+        containerId));
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::_usage(
+    const ContainerID& containerId)
+{
+  VLOG(1) << "Usage continuation on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "'' not running");
+  }
+
+  containerizer::Usage usage;
+  usage.mutable_container_id()->CopyFrom(containerId);
+
+  Try<Subprocess> invoked = invoke(
+      "usage",
+      actives[containerId]->sandbox,
+      usage);
+
+  if (invoked.isError()) {
+    // 'usage' has failed but we keep the container alive for now.
+    return Failure("Usage on container '" + containerId.value() +
+                   "' failed: " + invoked.error());
+  }
+
+  Result<ResourceStatistics>(*read)(int, bool, bool) =
+    &::protobuf::read<ResourceStatistics>;
+
+  Future<Result<ResourceStatistics> > future = async(
+      read, invoked.get().out(), false, false);
+
+  // Await both, a protobuf Message from the subprocess as well as
+  // its exit.
+  return await(future, invoked.get().status())
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__usage,
+        containerId,
+        lambda::_1));
+}
+
+
+Future<ResourceStatistics> ExternalContainerizerProcess::__usage(
+    const ContainerID& containerId,
+    const Future<tuples::tuple<
+        Future<Result<ResourceStatistics> >,
+        Future<Option<int> > > >& future)
+{
+  VLOG(1) << "Usage callback triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    return Failure("Container '" + containerId.value() + "' not running");
+  }
+
+  Try<ResourceStatistics> statistics = result<ResourceStatistics>(future);
+
+  if (statistics.isError()) {
+    return Failure(statistics.error());
+  }
+
+  VLOG(2) << "Container '" << containerId << "' "
+          << "total mem usage "
+          << statistics.get().mem_rss_bytes() << " "
+          << "total CPU user usage "
+          << statistics.get().cpus_user_time_secs() << " "
+          << "total CPU system usage "
+          << statistics.get().cpus_system_time_secs();
+
+  return statistics.get();
+}
+
+
+void ExternalContainerizerProcess::destroy(const ContainerID& containerId)
+{
+  VLOG(1) << "Destroy triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    LOG(ERROR) << "Container '" << containerId << "' not running";
+    return;
+  }
+
+  // Defer destroy until launch is done.
+  actives[containerId]->launched.future()
+    .onAny(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_destroy,
+        containerId));
+}
+
+
+void ExternalContainerizerProcess::_destroy(const ContainerID& containerId)
+{
+  VLOG(1) << "Destroy continuation on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    LOG(ERROR) << "Container '" << containerId << "' not running";
+    return;
+  }
+
+  containerizer::Destroy destroy;
+  destroy.mutable_container_id()->CopyFrom(containerId);
+
+  Try<Subprocess> invoked = invoke(
+      "destroy",
+      actives[containerId]->sandbox,
+      destroy);
+
+  if (invoked.isError()) {
+    LOG(ERROR) << "Destroy of container '" << containerId
+               << "' failed: " << invoked.error();
+    unwait(containerId);
+    return;
+  }
+
+  invoked.get().status()
+    .onAny(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__destroy,
+        containerId,
+        lambda::_1));
+}
+
+
+void ExternalContainerizerProcess::__destroy(
+    const ContainerID& containerId,
+    const Future<Option<int> >& future)
+{
+  VLOG(1) << "Destroy callback triggered on container '" << containerId << "'";
+
+  if (!actives.contains(containerId)) {
+    LOG(ERROR) << "Container '" << containerId.value() << "' not running";
+    return;
+  }
+
+  Option<Error> error = validate(future);
+  if (error.isSome()) {
+    LOG(ERROR) << "Destroy of container '" << containerId
+               << "' failed: " << error.get().message;
+  }
+
+  // Additionally to the optional external destroy-command, we need to
+  // terminate the external containerizer's "wait" process.
+  unwait(containerId);
+}
+
+
+Future<hashset<ContainerID> > ExternalContainerizerProcess::containers()
+{
+  return actives.keys();
+}
+
+
+void ExternalContainerizerProcess::cleanup(const ContainerID& containerId)
+{
+  VLOG(1) << "Callback performing final cleanup of running state";
+
+  if (actives.contains(containerId)) {
+    actives.erase(containerId);
+  } else {
+    LOG(WARNING) << "Container '" << containerId << "' not running anymore";
+  }
+}
+
+
+void ExternalContainerizerProcess::unwait(const ContainerID& containerId)
+{
+  if (!actives.contains(containerId)) {
+    LOG(WARNING) << "Container '" << containerId << "' not running";
+    return;
+  }
+
+  Option<pid_t> pid = actives[containerId]->pid;
+
+  // Containers that are being waited on have the "wait" command's
+  // pid assigned.
+  if (pid.isNone()) {
+    // If we reached this, launch most likely failed due to some error
+    // on the external containerizer's side (e.g. returned non zero on
+    // launch).
+    LOG(WARNING) << "Container '" << containerId << "' not being waited on";
+    cleanup(containerId);
+    return;
+  }
+
+  // Terminate the containerizer.
+  VLOG(2) << "About to send a SIGKILL to containerizer pid: " << pid.get();
+
+  // TODO(tillt): Add graceful termination as soon as we have an
+  // accepted way to do that in place.
+  Try<list<os::ProcessTree> > trees =
+    os::killtree(pid.get(), SIGKILL, true, true);
+
+  if (trees.isError()) {
+    LOG(WARNING) << "Failed to kill the process tree rooted at pid "
+                 << pid.get() << ": " << trees.error();
+    cleanup(containerId);
+    return;
+  }
+
+  LOG(INFO) << "Killed the following process tree/s:\n"
+            << stringify(trees.get());
+
+  // The cleanup function will get invoked via __wait which triggers
+  // once the external containerizer's "wait" subprocess gets
+  // terminated.
+}
+
+
+// Post fork, pre exec function.
+// TODO(tillt): Consider having the kernel notify us when our parent
+// process dies e.g. by invoking prctl(PR_SET_PDEATHSIG, ..) on linux.
+static int setup(const string& directory)
+{
+  // Put child into its own process session to prevent slave suicide
+  // on child process SIGKILL/SIGTERM.
+  if (::setsid() == -1) {
+    return errno;
+  }
+
+  // Re/establish the sandbox conditions for the containerizer.
+  if (::chdir(directory.c_str()) == -1) {
+    return errno;
+  }
+
+  // Sync parent and child process.
+  int sync = 0;
+  while (::write(STDOUT_FILENO, &sync, sizeof(sync)) == -1 &&
+         errno == EINTR);
+
+  return 0;
+}
+
+
+Try<process::Subprocess> ExternalContainerizerProcess::invoke(
+    const string& command,
+    const Sandbox& sandbox,
+    const google::protobuf::Message& message,
+    const map<string, string>& commandEnvironment)
+{
+  CHECK_SOME(flags.containerizer_path) << "containerizer_path not set";
+
+  VLOG(1) << "Invoking external containerizer for method '" << command << "'";
+
+  // Prepare a default environment.
+  map<string, string> environment;
+  environment["MESOS_LIBEXEC_DIRECTORY"] = flags.launcher_dir;
+
+  // Update default environment with command specific one.
+  environment.insert(commandEnvironment.begin(), commandEnvironment.end());
+
+  // Construct the command to execute.
+  string execute = flags.containerizer_path.get() + " " + command;
+
+  VLOG(2) << "calling: [" << execute << "]";
+  VLOG(2) << "directory: " << sandbox.directory;
+  VLOG_IF(sandbox.user.isSome(), 2) << "user: " << sandbox.user.get();
+
+  // Re/establish the sandbox conditions for the containerizer.
+  if (sandbox.user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        sandbox.user.get(),
+        sandbox.directory);
+    if (chown.isError()) {
+      return Error("Failed to chown work directory: " + chown.error());
+    }
+  }
+
+  // Fork exec of external process. Run a chdir and a setsid within
+  // the child-context.
+  Try<Subprocess> external = process::subprocess(
+      execute,
+      environment,
+      lambda::bind(&setup, sandbox.directory));
+
+  if (external.isError()) {
+    return Error("Failed to execute external containerizer: " +
+                 external.error());
+  }
+
+  // Sync parent and child process to make sure we have done the
+  // setsid within the child context before continuing.
+  int sync;
+  while (::read(external.get().out(), &sync, sizeof(sync)) == -1 &&
+         errno == EINTR);
+
+  // Set stderr into non-blocking mode.
+  Try<Nothing> nonblock = os::nonblock(external.get().err());
+  if (nonblock.isError()) {
+    return Error("Failed to accept nonblock: " + nonblock.error());
+  }
+
+  // We are not setting stdin or stdout into non-blocking mode as
+  // protobuf::read / write do currently not support it.
+
+  // Redirect output (stderr) from the external containerizer to log
+  // file in the executor work directory, chown'ing it if a user is
+  // specified.
+  Try<int> err = os::open(
+      path::join(sandbox.directory, "stderr"),
+      O_WRONLY | O_CREAT | O_APPEND | O_NONBLOCK,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  if (err.isError()) {
+    return Error("Failed to redirect stderr: " + err.error());
+  }
+
+  if (sandbox.user.isSome()) {
+    Try<Nothing> chown = os::chown(
+        sandbox.user.get(),
+        path::join(sandbox.directory, "stderr"));
+    if (chown.isError()) {
+      return Error("Failed to redirect stderr:" + chown.error());
+    }
+  }
+
+  io::splice(external.get().err(), err.get())
+    .onAny(bind(&os::close, err.get()));
+
+  // Transmit protobuf data via stdout towards the external
+  // containerizer. Each message is prefixed by its total size.
+  Try<Nothing> write = ::protobuf::write(external.get().in(), message);
+  if (write.isError()) {
+    return Error("Failed to write protobuf to pipe: " + write.error());
+  }
+
+  VLOG(2) << "Subprocess pid: " << external.get().pid() << ", "
+          << "output pipe: " << external.get().out();
+
+  return external;
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
new file mode 100644
index 0000000..eb3ff96
--- /dev/null
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __EXTERNAL_CONTAINERIZER_HPP__
+#define __EXTERNAL_CONTAINERIZER_HPP__
+
+#include <list>
+#include <sstream>
+#include <string>
+
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+#include <stout/tuple.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// The scheme an external containerizer has to adhere to is;
+//
+// COMMAND < INPUT-PROTO > RESULT-PROTO
+//
+// launch < containerizer::Launch
+// update < containerizer::Update
+// usage < containerizer::Usage > mesos::ResourceStatistics
+// wait < containerizer::Wait > containerizer::Termination
+// destroy < containerizer::Destroy
+//
+// 'wait' on the external containerizer side is expected to block
+// until the task command/executor has terminated.
+//
+
+// Check src/examples/python/test_containerizer.py for a rough
+// implementation template of this protocol.
+
+// TODO(tillt): Implement a protocol for external containerizer
+// recovery by defining needed protobuf/s.
+// Currently we expect to cover recovery entirely on the slave side.
+
+// For debugging purposes of an external containerizer, it might be
+// helpful to enable verbose logging on the slave (GLOG_v=2).
+
+class ExternalContainerizerProcess;
+
+class ExternalContainerizer : public Containerizer
+{
+public:
+  ExternalContainerizer(const Flags& flags);
+
+  virtual ~ExternalContainerizer();
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const TaskInfo& task,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(const ContainerID& containerId);
+
+  virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+  ExternalContainerizerProcess* process;
+};
+
+
+class ExternalContainerizerProcess
+  : public process::Process<ExternalContainerizerProcess>
+{
+public:
+  ExternalContainerizerProcess(const Flags& flags);
+
+  // Recover containerized executors as specified by state. See
+  // containerizer.hpp:recover for more.
+  process::Future<Nothing> recover(const Option<state::SlaveState>& state);
+
+  // Start the containerized executor.
+  process::Future<Nothing> launch(
+      const ContainerID& containerId,
+      const Option<TaskInfo>& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  // Update the container's resources.
+  process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  // Gather resource usage statistics for the containerized executor.
+  process::Future<ResourceStatistics> usage(const ContainerID& containerId);
+
+  // Get a future on the containerized executor's Termination.
+  process::Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  // Terminate the containerized executor.
+  void destroy(const ContainerID& containerId);
+
+  // Get all active container-id's.
+  process::Future<hashset<ContainerID> > containers();
+
+private:
+  // Startup flags.
+  const Flags flags;
+
+  // Information describing a container environment. A sandbox has to
+  // be prepared before the external containerizer can be invoked.
+  struct Sandbox
+  {
+    Sandbox(const std::string& directory, const Option<std::string>& user)
+      : directory(directory), user(user) {}
+
+    const std::string directory;
+    const Option<std::string> user;
+  };
+
+  // Information describing a running container.
+  struct Container
+  {
+    Container(const Sandbox& sandbox) : sandbox(sandbox), pid(None()) {}
+
+    // Keep sandbox information available for subsequent containerizer
+    // invocations.
+    Sandbox sandbox;
+
+    // External containerizer pid as per wait-invocation.
+    // Wait should block on the external containerizer side, hence we
+    // need to keep its pid for terminating if needed.
+    Option<pid_t> pid;
+
+    process::Promise<containerizer::Termination> termination;
+
+    // As described in MESOS-1251, we need to make sure that events
+    // that are triggered before launch has completed, are in fact
+    // queued until then to reduce complexity within external
+    // containerizer program implementations. To achieve that, we
+    // simply queue all events onto this promise.
+    process::Promise<Nothing> launched;
+
+    Resources resources;
+  };
+
+  // Stores all active containers.
+  hashmap<ContainerID, process::Owned<Container> > actives;
+
+  process::Future<Nothing> _launch(
+      const ContainerID& containerId,
+      const process::Future<Option<int> >& future);
+
+  void __launch(
+      const ContainerID& containerId,
+      const process::Future<Nothing>& future);
+
+  process::Future<containerizer::Termination> _wait(
+      const ContainerID& containerId);
+
+  void __wait(
+      const ContainerID& containerId,
+      const process::Future<tuples::tuple<
+          process::Future<Result<containerizer::Termination> >,
+          process::Future<Option<int> > > >& future);
+
+  process::Future<Nothing> _update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  process::Future<Nothing> __update(
+      const ContainerID& containerId,
+      const process::Future<Option<int> >& future);
+
+  process::Future<ResourceStatistics> _usage(
+      const ContainerID& containerId);
+
+  process::Future<ResourceStatistics> __usage(
+      const ContainerID& containerId,
+      const process::Future<tuples::tuple<
+          process::Future<Result<ResourceStatistics> >,
+          process::Future<Option<int> > > >& future);
+
+  void _destroy(const ContainerID& containerId);
+
+  void __destroy(
+      const ContainerID& containerId,
+      const process::Future<Option<int> >& future);
+
+  // Abort a possibly pending "wait" in the external containerizer
+  // process.
+  void unwait(const ContainerID& containerId);
+
+  // Call back for when the containerizer has terminated all processes
+  // in the container.
+  void cleanup(const ContainerID& containerId);
+
+  Try<process::Subprocess> invoke(
+      const std::string& command,
+      const Sandbox& sandbox,
+      const google::protobuf::Message& message,
+      const std::map<std::string, std::string>& environment =
+        (std::map<std::string, std::string>())); // Wrapped in parens due to: http://llvm.org/bugs/show_bug.cgi?id=13657
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __EXTERNAL_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index f9cc8e6..27813af 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -157,6 +157,7 @@ Future<Nothing> MesosContainerizer::launch(
                   checkpoint);
 }
 
+
 Future<Nothing> MesosContainerizer::update(
     const ContainerID& containerId,
     const Resources& resources)

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index f38de51..0a04ad5 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -57,7 +57,7 @@ public:
     add(&Flags::isolation,
         "isolation",
         "Isolation mechanisms to use, e.g., 'posix/cpu,posix/mem'\n"
-        "or 'cgroups/cpu,cgroups/mem'.",
+        "or 'cgroups/cpu,cgroups/mem' or 'external'.",
         "posix/cpu,posix/mem");
 
     add(&Flags::default_role,
@@ -70,8 +70,8 @@ public:
         "*");
 
     add(&Flags::attributes,
-      "attributes",
-      "Attributes of machine");
+        "attributes",
+        "Attributes of machine");
 
     add(&Flags::work_dir,
         "work_dir",
@@ -210,6 +210,16 @@ public:
         "Path to a file containing a single line with\n"
         "the 'principal' and 'secret' separated by whitespace.\n"
         "Path could be of the form 'file:///path/to/file' or '/path/to/file'");
+
+    add(&Flags::containerizer_path,
+        "containerizer_path",
+        "The path to the external containerizer executable used when\n"
+        "external isolation is activated (--isolation=external).\n");
+
+    add(&Flags::default_container_image,
+        "default_container_image",
+        "The default container image to use if not specified by a task,\n"
+        "when using external containerizer");
   }
 
   bool version;
@@ -241,6 +251,8 @@ public:
   Option<std::string> slave_subsystems;
 #endif
   Option<std::string> credential;
+  Option<std::string> containerizer_path;
+  Option<std::string> default_container_image;
 };
 
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd6a0dad/src/tests/external_containerizer_test.cpp
----------------------------------------------------------------------
diff --git a/src/tests/external_containerizer_test.cpp b/src/tests/external_containerizer_test.cpp
new file mode 100644
index 0000000..6f7b13e
--- /dev/null
+++ b/src/tests/external_containerizer_test.cpp
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <gmock/gmock.h>
+
+#include <string>
+#include <vector>
+#include <map>
+
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+
+#include "master/master.hpp"
+#include "master/detector.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/external_containerizer.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+#include "tests/flags.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using namespace process;
+
+using mesos::internal::master::Master;
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::Invoke;
+
+// TODO(tillt): Update and enhance the ExternalContainerizer tests,
+// possibly following some of the patterns used within the
+// IsolatorTests.
+class ExternalContainerizerTest : public MesosTest {};
+
+
+class MockExternalContainerizer : public slave::ExternalContainerizer
+{
+public:
+  MOCK_METHOD8(
+      launch,
+      process::Future<Nothing>(
+          const ContainerID&,
+          const TaskInfo&,
+          const ExecutorInfo&,
+          const std::string&,
+          const Option<std::string>&,
+          const SlaveID&,
+          const process::PID<slave::Slave>&,
+          bool checkpoint));
+
+  MockExternalContainerizer(const slave::Flags& flags)
+    : ExternalContainerizer(flags)
+  {
+    // Set up defaults for mocked methods.
+    // NOTE: See TestContainerizer::setup for why we use
+    // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'.
+    EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
+      .WillRepeatedly(Invoke(this, &MockExternalContainerizer::_launch));
+  }
+
+  process::Future<Nothing> _launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint)
+  {
+    return slave::ExternalContainerizer::launch(
+        containerId,
+        taskInfo,
+        executorInfo,
+        directory,
+        user,
+        slaveId,
+        slavePid,
+        checkpoint);
+  }
+};
+
+
+TEST_F(ExternalContainerizerTest, Launch)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  Flags testFlags;
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  flags.isolation = "external";
+  flags.containerizer_path =
+    testFlags.build_dir + "/src/examples/python/test-containerizer";
+
+  MockExternalContainerizer containerizer(flags);
+
+  Try<PID<Slave> > slave = this->StartSlave(&containerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("isolator_test");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+
+  Resources resources(offers.get()[0].resources());
+  Option<Bytes> mem = resources.mem();
+  ASSERT_SOME(mem);
+  Option<double> cpus = resources.cpus();
+  ASSERT_SOME(cpus);
+
+  const std::string& file = path::join(flags.work_dir, "ready");
+
+  // This task induces user/system load in a child process by
+  // running top in a child process for ten seconds.
+  task.mutable_command()->set_value(
+#ifdef __APPLE__
+      // Use logging mode with 30,000 samples with no interval.
+      "top -l 30000 -s 0 2>&1 > /dev/null & "
+#else
+      // Batch mode, with 30,000 samples with no interval.
+      "top -b -d 0 -n 30000 2>&1 > /dev/null & "
+#endif
+      "touch " + file +  "; " // Signals that the top command is running.
+      "sleep 60");
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore rest for now
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&containerizer,
+                           &MockExternalContainerizer::_launch)));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Wait for the task to begin inducing cpu time.
+  while (!os::exists(file));
+
+  ExecutorID executorId;
+  executorId.set_value(task.task_id().value());
+
+  // We'll wait up to 10 seconds for the child process to induce
+  // 1/8 of a second of user and system cpu time in total.
+  // TODO(bmahler): Also induce rss memory consumption, by re-using
+  // the balloon framework.
+  ResourceStatistics statistics;
+  Duration waited = Duration::zero();
+  do {
+    Future<ResourceStatistics> usage = containerizer.usage(containerId.get());
+    AWAIT_READY(usage);
+
+    statistics = usage.get();
+
+    // If we meet our usage expectations, we're done!
+    // NOTE: We are currently getting dummy-data from the test-
+    // containerizer python script matching these expectations.
+    // TODO(tillt): Consider working with real data.
+    if (statistics.cpus_user_time_secs() >= 0.120 &&
+        statistics.cpus_system_time_secs() >= 0.05 &&
+        statistics.mem_rss_bytes() >= 1024u) {
+      break;
+    }
+
+    os::sleep(Milliseconds(100));
+    waited += Milliseconds(100);
+  } while (waited < Seconds(10));
+
+  EXPECT_GE(statistics.cpus_user_time_secs(), 0.120);
+  EXPECT_GE(statistics.cpus_system_time_secs(), 0.05);
+  EXPECT_EQ(statistics.cpus_limit(), cpus.get());
+  EXPECT_GE(statistics.mem_rss_bytes(), 1024u);
+  EXPECT_EQ(statistics.mem_limit_bytes(), mem.get().bytes());
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.killTask(task.task_id());
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_KILLED, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+}