You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/30 05:58:49 UTC

[2/2] mesos git commit: Refactor fetcher namespace into a class/process.

Refactor fetcher namespace into a class/process.

Layed the groundwork for having a fetcher object with a cache dir per
slave. Factored all fetcher-relevant code into a fetcher class and
process. Added a fetcher parameter to a lot of methods related to
launching tasks, mostly in containerizers. The latter are thus not
holders of fetchers, only slaves are. See MESOS-2172.

Review: https://reviews.apache.org/r/28830


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2fd56590
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2fd56590
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2fd56590

Branch: refs/heads/master
Commit: 2fd565901728309bb4f467f599f08f0b1e57eb69
Parents: 8254f8f
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 16:08:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800

----------------------------------------------------------------------
 src/local/local.cpp                             |  19 +-
 src/slave/containerizer/containerizer.cpp       |  11 +-
 src/slave/containerizer/containerizer.hpp       |   6 +-
 src/slave/containerizer/docker.cpp              |  40 ++--
 src/slave/containerizer/docker.hpp              |  14 +-
 src/slave/containerizer/fetcher.cpp             | 191 ++++++++++++++++---
 src/slave/containerizer/fetcher.hpp             | 174 +++++++++++------
 src/slave/containerizer/mesos/containerizer.cpp |  40 ++--
 src/slave/containerizer/mesos/containerizer.hpp |  13 +-
 src/slave/main.cpp                              |   7 +-
 src/tests/cluster.hpp                           |  10 +-
 src/tests/containerizer_tests.cpp               |  27 ++-
 src/tests/docker_containerizer_tests.cpp        |  83 +++++---
 src/tests/fetcher_tests.cpp                     | 108 ++++++-----
 src/tests/health_check_tests.cpp                |  30 ++-
 src/tests/isolator_tests.cpp                    |   6 +-
 src/tests/slave_recovery_tests.cpp              | 170 +++++++++++------
 src/tests/slave_tests.cpp                       |  23 ++-
 18 files changed, 668 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 89fed0b..76e73a4 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -51,11 +51,13 @@
 
 #include "module/manager.hpp"
 
-#include "slave/containerizer/containerizer.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "state/in_memory.hpp"
 #include "state/log.hpp"
 #include "state/protobuf.hpp"
@@ -74,6 +76,7 @@ using mesos::internal::master::Registrar;
 using mesos::internal::master::Repairer;
 
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::GarbageCollector;
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::StatusUpdateManager;
@@ -108,6 +111,7 @@ static Option<Authorizer*> authorizer = None();
 static Files* files = NULL;
 static vector<GarbageCollector*>* garbageCollectors = NULL;
 static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
+static vector<Fetcher*>* fetchers = NULL;
 
 
 PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -212,6 +216,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
   garbageCollectors = new vector<GarbageCollector*>();
   statusUpdateManagers = new vector<StatusUpdateManager*>();
+  fetchers = new vector<Fetcher*>();
 
   vector<UPID> pids;
 
@@ -226,8 +231,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
     garbageCollectors->push_back(new GarbageCollector());
     statusUpdateManagers->push_back(new StatusUpdateManager(flags));
+    fetchers->push_back(new Fetcher());
+
+    Try<Containerizer*> containerizer =
+      Containerizer::create(flags, true, fetchers->back());
 
-    Try<Containerizer*> containerizer = Containerizer::create(flags, true);
     if (containerizer.isError()) {
       EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
     }
@@ -307,6 +315,13 @@ void shutdown()
     delete statusUpdateManagers;
     statusUpdateManagers = NULL;
 
+    foreach (Fetcher* fetcher, *fetchers) {
+      delete fetcher;
+    }
+
+    delete fetchers;
+    fetchers = NULL;
+
     delete registrar;
     registrar = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 1448bea..e89511a 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -159,7 +159,10 @@ Try<Resources> Containerizer::resources(const Flags& flags)
 }
 
 
-Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
+Try<Containerizer*> Containerizer::create(
+    const Flags& flags,
+    bool local,
+    Fetcher* fetcher)
 {
   if (flags.isolation == "external") {
     LOG(WARNING) << "The 'external' isolation flag is deprecated, "
@@ -167,7 +170,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
                  << " '--containerizers=external'.";
 
     Try<ExternalContainerizer*> containerizer =
-        ExternalContainerizer::create(flags);
+      ExternalContainerizer::create(flags);
     if (containerizer.isError()) {
       return Error("Could not create ExternalContainerizer: " +
                    containerizer.error());
@@ -185,7 +188,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
   foreach (const string& type, strings::split(flags.containerizers, ",")) {
     if (type == "mesos") {
       Try<MesosContainerizer*> containerizer =
-        MesosContainerizer::create(flags, local);
+        MesosContainerizer::create(flags, local, fetcher);
       if (containerizer.isError()) {
         return Error("Could not create MesosContainerizer: " +
                      containerizer.error());
@@ -194,7 +197,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
       }
     } else if (type == "docker") {
       Try<DockerContainerizer*> containerizer =
-        DockerContainerizer::create(flags);
+        DockerContainerizer::create(flags, fetcher);
       if (containerizer.isError()) {
         return Error("Could not create DockerContainerizer: " +
                      containerizer.error());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 02754cd..129e60f 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
+#include "slave/containerizer/fetcher.hpp"
 
 namespace mesos {
 namespace internal {
@@ -55,7 +56,10 @@ class Containerizer
 {
 public:
   // Attempts to create a containerizer as specified by 'isolation' in flags.
-  static Try<Containerizer*> create(const Flags& flags, bool local);
+  static Try<Containerizer*> create(
+      const Flags& flags,
+      bool local,
+      Fetcher* fetcher);
 
   // Determine slave resources from flags, probing the system or querying a
   // delegate.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 19a6ea2..5f4b4ce 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -103,14 +103,19 @@ Option<ContainerID> parse(const Docker::Container& container)
 }
 
 
-Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
+Try<DockerContainerizer*> DockerContainerizer::create(
+    const Flags& flags,
+    Fetcher* fetcher)
 {
   Try<Docker*> docker = Docker::create(flags.docker);
   if (docker.isError()) {
     return Error(docker.error());
   }
 
-  return new DockerContainerizer(flags, Shared<Docker>(docker.get()));
+  return new DockerContainerizer(
+      flags,
+      fetcher,
+      Shared<Docker>(docker.get()));
 }
 
 
@@ -124,8 +129,9 @@ DockerContainerizer::DockerContainerizer(
 
 DockerContainerizer::DockerContainerizer(
     const Flags& flags,
+    Fetcher* fetcher,
     Shared<Docker> docker)
-  : process(new DockerContainerizerProcess(flags, docker))
+  : process(new DockerContainerizerProcess(flags, fetcher, docker))
 {
   spawn(process.get());
 }
@@ -221,29 +227,12 @@ Future<Nothing> DockerContainerizerProcess::fetch(
   CHECK(containers_.contains(containerId));
   Container* container = containers_[containerId];
 
-  CommandInfo commandInfo = container->command();
-
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
-  }
-
-  VLOG(1) << "Starting to fetch URIs for container: " << containerId
-          << ", directory: " << container->directory;
-
-  Try<Subprocess> fetcher = fetcher::run(
-      commandInfo,
+  return fetcher->fetch(
+      containerId,
+      container->command(),
       container->directory,
       None(),
       flags);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  container->fetcher = fetcher.get();
-
-  return fetcher.get().status()
-    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 
@@ -1191,10 +1180,7 @@ void DockerContainerizerProcess::destroy(
     LOG(INFO) << "Destroying Container '"
               << containerId << "' in FETCHING state";
 
-    if (container->fetcher.isSome()) {
-      // Best effort kill the entire fetcher tree.
-      os::killtree(container->fetcher.get().pid(), SIGKILL);
-    }
+    fetcher->kill(containerId);
 
     containerizer::Termination termination;
     termination.set_killed(killed);

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 28ebc62..b7bf54a 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -49,11 +49,14 @@ class DockerContainerizerProcess;
 class DockerContainerizer : public Containerizer
 {
 public:
-  static Try<DockerContainerizer*> create(const Flags& flags);
+  static Try<DockerContainerizer*> create(
+      const Flags& flags,
+      Fetcher* fetcher);
 
   // This is only public for tests.
   DockerContainerizer(
       const Flags& flags,
+      Fetcher* fetcher,
       process::Shared<Docker> docker);
 
   // This is only public for tests.
@@ -110,8 +113,10 @@ class DockerContainerizerProcess
 public:
   DockerContainerizerProcess(
       const Flags& _flags,
+      Fetcher* _fetcher,
       process::Shared<Docker> _docker)
     : flags(_flags),
+      fetcher(_fetcher),
       docker(_docker) {}
 
   virtual process::Future<Nothing> recover(
@@ -242,6 +247,8 @@ private:
 
   const Flags flags;
 
+  Fetcher* fetcher;
+
   process::Shared<Docker> docker;
 
   struct Container
@@ -418,11 +425,6 @@ private:
     // or ExecutorInfo::resources because they can change dynamically.
     Resources resources;
 
-    // The mesos-fetcher subprocess, kept around so that we can do a
-    // killtree on it if we're asked to destroy a container while we
-    // are fetching.
-    Option<Subprocess> fetcher;
-
     // The docker pull future is stored so we can discard when
     // destroy is called while docker is pulling the image.
     Future<Docker::Image> pull;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d702a9c..d04799f 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,19 +16,38 @@
  * limitations under the License.
  */
 
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
 
 using std::map;
 using std::string;
+using std::vector;
+
+using process::Future;
 
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace fetcher {
 
-map<string, string> environment(
+
+Fetcher::Fetcher() : process(new FetcherProcess())
+{
+  spawn(process.get());
+}
+
+
+Fetcher::~Fetcher()
+{
+  terminate(process.get());
+  process::wait(process.get());
+}
+
+
+map<string, string> Fetcher::environment(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -56,7 +75,134 @@ map<string, string> environment(
 }
 
 
-Try<Subprocess> run(
+Future<Nothing> Fetcher::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags,
+    const Option<int>& stdout,
+    const Option<int>& stderr)
+{
+  if (commandInfo.uris().size() == 0) {
+    return Nothing();
+  }
+
+  return dispatch(process.get(),
+                  &FetcherProcess::fetch,
+                  containerId,
+                  commandInfo,
+                  directory,
+                  user,
+                  flags,
+                  stdout,
+                  stderr);
+}
+
+
+Future<Nothing> Fetcher::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags)
+{
+  if (commandInfo.uris().size() == 0) {
+    return Nothing();
+  }
+
+  return dispatch(process.get(),
+                  &FetcherProcess::fetch,
+                  containerId,
+                  commandInfo,
+                  directory,
+                  user,
+                  flags);
+}
+
+
+void Fetcher::kill(const ContainerID& containerId)
+{
+  dispatch(process.get(), &FetcherProcess::kill, containerId);
+}
+
+
+FetcherProcess::~FetcherProcess()
+{
+  foreach (const ContainerID& containerId, subprocessPids.keys()) {
+    kill(containerId);
+  }
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags,
+    const Option<int>& stdout,
+    const Option<int>& stderr)
+{
+  VLOG(1) << "Starting to fetch URIs for container: " << containerId
+        << ", directory: " << directory;
+
+  Try<Subprocess> subprocess =
+    run(commandInfo, directory, user, flags, stdout, stderr);
+
+  if (subprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  }
+
+  subprocessPids[containerId] = subprocess.get().pid();
+
+  return subprocess.get().status()
+    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags)
+{
+  VLOG(1) << "Starting to fetch URIs for container: " << containerId
+        << ", directory: " << directory;
+
+  Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+
+  if (subprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  }
+
+  subprocessPids[containerId] = subprocess.get().pid();
+
+  return subprocess.get().status()
+    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::_fetch(
+    const ContainerID& containerId,
+    const Option<int>& status)
+{
+  subprocessPids.erase(containerId);
+
+  if (status.isNone()) {
+    return Failure("No status available from fetcher");
+  } else if (status.get() != 0) {
+    return Failure("Failed to fetch URIs for container '" +
+                   stringify(containerId) + "'with exit status: " +
+                   stringify(status.get()));
+  }
+
+  return Nothing();
+}
+
+
+Try<Subprocess> FetcherProcess::run(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -83,7 +229,7 @@ Try<Subprocess> run(
 
   LOG(INFO) << "Fetching URIs using command '" << command << "'";
 
-  Try<Subprocess> fetcher = subprocess(
+  Try<Subprocess> fetcherSubprocess = subprocess(
     command,
     Subprocess::PIPE(),
     stdout.isSome()
@@ -92,17 +238,18 @@ Try<Subprocess> run(
     stderr.isSome()
       ? Subprocess::FD(stderr.get())
       : Subprocess::PIPE(),
-    environment(commandInfo, directory, user, flags));
+    Fetcher::environment(commandInfo, directory, user, flags));
 
-  if (fetcher.isError()) {
-    return Error("Failed to execute mesos-fetcher: " + fetcher.error());
+  if (fetcherSubprocess.isError()) {
+    return Error(
+        "Failed to execute mesos-fetcher: " +  fetcherSubprocess.error());
   }
 
-  return fetcher;
+  return fetcherSubprocess;
 }
 
 
-Try<Subprocess> run(
+Try<Subprocess> FetcherProcess::run(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -147,7 +294,7 @@ Try<Subprocess> run(
     }
   }
 
-  Try<Subprocess> fetcher = fetcher::run(
+  Try<Subprocess> subprocess = run(
       commandInfo,
       directory,
       user,
@@ -155,31 +302,25 @@ Try<Subprocess> run(
       out.get(),
       err.get());
 
-  fetcher.get().status()
+  subprocess.get().status()
     .onAny(lambda::bind(&os::close, out.get()))
     .onAny(lambda::bind(&os::close, err.get()));
 
-  return fetcher;
+  return subprocess;
 }
 
 
-Future<Nothing> _run(
-    const ContainerID& containerId,
-    const Option<int>& status)
+void FetcherProcess::kill(const ContainerID& containerId)
 {
-  if (status.isNone()) {
-    return Failure("No status available from fetcher");
-  } else if (status.get() != 0) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "'with exit status: " +
-                   stringify(status.get()));
-  }
+  if (subprocessPids.contains(containerId)) {
+    VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+    // Best effort kill the entire fetcher tree.
+    os::killtree(subprocessPids.get(containerId).get(), SIGKILL);
 
-  return Nothing();
+    subprocessPids.erase(containerId);
+  }
 }
 
-
-} // namespace fetcher {
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 12b81b1..8bbdf07 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -19,72 +19,134 @@
 #ifndef __SLAVE_FETCHER_HPP__
 #define __SLAVE_FETCHER_HPP__
 
-#include <map>
 #include <string>
+#include <vector>
+
+#include <stout/hashmap.hpp>
 
 #include <process/future.hpp>
-#include <process/io.hpp>
+#include <process/process.hpp>
 #include <process/subprocess.hpp>
 
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/result.hpp>
-#include <stout/strings.hpp>
-#include <stout/try.hpp>
-
 #include <mesos/mesos.hpp>
-
-#include "slave/flags.hpp"
+#include <slave/flags.hpp>
 
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace fetcher {
-
-// Defines helpers for running the mesos-fetcher.
-// TODO(benh): Consider moving this into a 'fetcher' subdirectory as
-// well as the actual mesos-fetcher program (in launcher/fetcher.cpp
-// as of the writing of this comment).
-
-// Helper method to build the environment used to run mesos-fetcher.
-std::map<std::string, std::string> environment(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags);
-
-// Run the mesos-fetcher for the specified arguments. Note that if
-// 'stdout' and 'stderr' file descriptors are provided then respective
-// output from the mesos-fetcher will be redirected to the file
-// descriptors. The file descriptors are duplicated (via dup) because
-// redirecting might still be occuring even after the mesos-fetcher has
-// terminated since there still might be data to be read.
-Try<process::Subprocess> run(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr);
-
-// Run the mesos-fetcher for the specified arguments, creating a
-// "stdout" and "stderr" file in the given directory and using
-// these for output.
-Try<process::Subprocess> run(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags);
-
-// Check status and return an error if any. Typically used after
-// calling run().
-process::Future<Nothing> _run(
-    const ContainerID& containerId,
-    const Option<int>& status);
-
-} // namespace fetcher {
+
+// Forward declaration.
+class FetcherProcess;
+
+// Argument passing to and invocation of the external fetcher program.
+// TODO(bernd-mesos) : Orchestration and synchronization of fetching
+// phases. Bookkeeping of executor files that are cached after
+// downloading from a URI by the fetcher program. Cache eviction.
+// There has to be exactly one fetcher with a distinct cache dir per
+// active slave. This means that the cache dir can only be fixed
+// after the slave ID has been determined by registration or recovery.
+class Fetcher
+{
+public:
+  Fetcher();
+
+  virtual ~Fetcher();
+
+  // Download the URIs specified in the command info and place the
+  // resulting files into the given work directory. Chmod said files
+  // to the user if given.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  // Same as above, but send stdout and stderr to the files 'stdout'
+  // and 'stderr' in the specified directory.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  // Best effort to kill the fetcher subprocess associated with the
+  // indicated container. Do nothing if no such subprocess exists.
+  void kill(const ContainerID& containerId);
+
+  // Build the environment passed to the mesos-fetcher program.
+  static std::map<std::string, std::string> environment(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+private:
+  process::Owned<FetcherProcess> process;
+};
+
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+  FetcherProcess() : ProcessBase("__fetcher__") {}
+
+  virtual ~FetcherProcess();
+
+  // Fetcher implementation.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  void kill(const ContainerID& containerId);
+
+private:
+  // Check status and return an error if any.
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const Option<int>& status);
+
+  // Run the mesos-fetcher with custom output redirection. If
+  // 'stdout' and 'stderr' file descriptors are provided then respective
+  // output from the mesos-fetcher will be redirected to the file
+  // descriptors. The file descriptors are duplicated (via dup) because
+  // redirecting might still be occuring even after the mesos-fetcher has
+  // terminated since there still might be data to be read.
+  // This method is only "public" for test purposes.
+  Try<process::Subprocess> run(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  // Run the mesos-fetcher, creating a "stdout" and "stderr" file
+  // in the given directory and using these for output.
+  Try<process::Subprocess> run(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  hashmap<ContainerID, pid_t> subprocessPids;
+};
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d70259b..5c014eb 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -77,7 +77,8 @@ Future<Nothing> _nothing() { return Nothing(); }
 
 Try<MesosContainerizer*> MesosContainerizer::create(
     const Flags& flags,
-    bool local)
+    bool local,
+    Fetcher* fetcher)
 {
   string isolation;
   if (flags.isolation == "process") {
@@ -164,16 +165,22 @@ Try<MesosContainerizer*> MesosContainerizer::create(
   }
 
   return new MesosContainerizer(
-      flags_, local, Owned<Launcher>(launcher.get()), isolators);
+      flags_, local, fetcher, Owned<Launcher>(launcher.get()), isolators);
 }
 
 
 MesosContainerizer::MesosContainerizer(
     const Flags& flags,
     bool local,
+    Fetcher* fetcher,
     const Owned<Launcher>& launcher,
     const vector<Owned<Isolator>>& isolators)
-  : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
+  : process(new MesosContainerizerProcess(
+      flags,
+      local,
+      fetcher,
+      launcher,
+      isolators))
 {
   spawn(process.get());
 }
@@ -540,29 +547,12 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     return Failure("Container is already destroyed");
   }
 
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
-  }
-
-  Try<Subprocess> fetcher = fetcher::run(
+  return fetcher->fetch(
+      containerId,
       commandInfo,
       directory,
       user,
       flags);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  // TODO(tnachen): Currently the fetcher won't shutdown when slave
-  // exits. This means the fetcher will still be running when slave
-  // restarts and after recovering. We won't resume the task since
-  // it hasn't checkpointed yet. Once the fetcher supports existing
-  // on slave it will be removed automatically.
-  containers_[containerId]->fetcher = fetcher.get();
-
-  return fetcher.get().status()
-    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 
@@ -899,10 +889,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& containerId)
     return;
   }
 
-  if (container->state == FETCHING && container->fetcher.isSome()) {
-    VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
-    // Best effort kill the entire fetcher tree.
-    os::killtree(container->fetcher.get().pid(), SIGKILL);
+  if (container->state == FETCHING) {
+    fetcher->kill(containerId);
   }
 
   if (container->state == ISOLATING) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 0b635d4..802988c 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -39,11 +39,15 @@ class MesosContainerizerProcess;
 class MesosContainerizer : public Containerizer
 {
 public:
-  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+  static Try<MesosContainerizer*> create(
+      const Flags& flags,
+      bool local,
+      Fetcher* fetcher);
 
   MesosContainerizer(
       const Flags& flags,
       bool local,
+      Fetcher* fetcher,
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<Isolator>>& isolators);
 
@@ -100,10 +104,12 @@ public:
   MesosContainerizerProcess(
       const Flags& _flags,
       bool _local,
+      Fetcher* _fetcher,
       const process::Owned<Launcher>& _launcher,
       const std::vector<process::Owned<Isolator>>& _isolators)
     : flags(_flags),
       local(_local),
+      fetcher(_fetcher),
       launcher(_launcher),
       isolators(_isolators) {}
 
@@ -214,6 +220,7 @@ private:
 
   const Flags flags;
   const bool local;
+  Fetcher* fetcher;
   const process::Owned<Launcher> launcher;
   const std::vector<process::Owned<Isolator>> isolators;
 
@@ -245,10 +252,6 @@ private:
     // determine the cause of an executor termination.
     std::vector<Limitation> limitations;
 
-    // The mesos-fetcher subprocess, that we keep around so we can
-    // stop the fetcher when the container is destroyed.
-    Option<process::Subprocess> fetcher;
-
     // We keep track of the resources for each container so we can set the
     // ResourceStatistics limits in usage().
     Resources resources;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 087944a..2ff2b0d 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -151,13 +151,18 @@ int main(int argc, char** argv)
     LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
   }
 
-  Try<Containerizer*> containerizer = Containerizer::create(flags, false);
+  Fetcher fetcher;
+
+  Try<Containerizer*> containerizer =
+    Containerizer::create(flags, false, &fetcher);
+
   if (containerizer.isError()) {
     EXIT(1) << "Failed to create a containerizer: "
             << containerizer.error();
   }
 
   Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
   if (detector.isError()) {
     EXIT(1) << "Failed to create a master detector: " << detector.error();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index fa5eeef..74cedb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -63,10 +63,12 @@
 
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
-#include "slave/containerizer/containerizer.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "state/in_memory.hpp"
 #include "state/log.hpp"
 #include "state/protobuf.hpp"
@@ -185,6 +187,7 @@ public:
       slave::Containerizer* containerizer;
       bool createdContainerizer; // Whether we own the containerizer.
 
+      process::Owned<slave::Fetcher> fetcher;
       process::Owned<slave::StatusUpdateManager> statusUpdateManager;
       process::Owned<slave::GarbageCollector> gc;
       process::Owned<MasterDetector> detector;
@@ -468,8 +471,11 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
   if (containerizer.isSome()) {
     slave.containerizer = containerizer.get();
   } else {
+    // Create a new fetcher.
+    slave.fetcher.reset(new slave::Fetcher());
+
     Try<slave::Containerizer*> containerizer =
-      slave::Containerizer::create(flags, true);
+      slave::Containerizer::create(flags, true, slave.fetcher.get());
     CHECK_SOME(containerizer);
 
     slave.containerizer = containerizer.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 02a5f15..cfe31a6 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,6 +31,7 @@
 
 #include "slave/flags.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
 
@@ -61,6 +62,7 @@ public:
   // Construct a MesosContainerizer with TestIsolator(s) which use the provided
   // 'prepare' command(s).
   Try<MesosContainerizer*> CreateContainerizer(
+      Fetcher* fetcher,
       const vector<Option<CommandInfo> >& prepares)
   {
     vector<Owned<Isolator> > isolators;
@@ -85,18 +87,20 @@ public:
     return new MesosContainerizer(
         flags,
         false,
+        fetcher,
         Owned<Launcher>(launcher.get()),
         isolators);
   }
 
 
   Try<MesosContainerizer*> CreateContainerizer(
+      Fetcher* fetcher,
       const Option<CommandInfo>& prepare)
   {
     vector<Option<CommandInfo> > prepares;
     prepares.push_back(prepare);
 
-    return CreateContainerizer(prepares);
+    return CreateContainerizer(fetcher, prepares);
   }
 };
 
@@ -107,7 +111,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptSucceeds)
   string directory = os::getcwd(); // We're inside a temporary sandbox.
   string file = path::join(directory, "child.script.executed");
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer = CreateContainerizer(
+      &fetcher,
       CREATE_COMMAND_INFO("touch " + file));
   CHECK_SOME(containerizer);
 
@@ -151,7 +158,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptFails)
   string directory = os::getcwd(); // We're inside a temporary sandbox.
   string file = path::join(directory, "child.script.executed");
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer = CreateContainerizer(
+      &fetcher,
       CREATE_COMMAND_INFO("touch " + file + " && exit 1"));
   CHECK_SOME(containerizer);
 
@@ -205,7 +215,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, MultipleScripts)
   // This will fail, either first or after the successful command.
   prepares.push_back(CREATE_COMMAND_INFO("touch " + file2 + " && exit 1"));
 
-  Try<MesosContainerizer*> containerizer = CreateContainerizer(prepares);
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> containerizer =
+    CreateContainerizer(&fetcher, prepares);
   CHECK_SOME(containerizer);
 
   ContainerID containerId;
@@ -251,9 +264,11 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
+  Fetcher fetcher;
+
   // Use local=false so std{err,out} are redirected to files.
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   ASSERT_SOME(containerizer);
 
   ContainerID containerId;
@@ -306,9 +321,10 @@ public:
   MockMesosContainerizerProcess(
       const Flags& flags,
       bool local,
+      Fetcher* fetcher,
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<Isolator>>& isolators)
-    : MesosContainerizerProcess(flags, local, launcher, isolators)
+    : MesosContainerizerProcess(flags, local, fetcher, launcher, isolators)
   {
     // NOTE: See TestContainerizer::setup for why we use
     // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
@@ -343,9 +359,12 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
   ASSERT_SOME(launcher);
   std::vector<process::Owned<Isolator>> isolators;
 
+  Fetcher fetcher;
+
   MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
       flags,
       true,
+      &fetcher,
       Owned<Launcher>(launcher.get()),
       isolators);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index bed2d10..2105ae2 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -34,6 +34,8 @@
 #include "tests/mesos.hpp"
 
 #include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
 #include "slave/state.hpp"
@@ -49,9 +51,10 @@ using namespace process;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::DockerContainerizer;
 using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
 
 using process::Future;
 using process::Message;
@@ -171,8 +174,9 @@ class MockDockerContainerizer : public DockerContainerizer {
 public:
   MockDockerContainerizer(
       const slave::Flags& flags,
+      Fetcher* fetcher,
       Shared<Docker> docker)
-    : DockerContainerizer(flags, docker)
+    : DockerContainerizer(flags, fetcher, docker)
   {
     initialize();
   }
@@ -285,8 +289,9 @@ class MockDockerContainerizerProcess : public DockerContainerizerProcess
 public:
   MockDockerContainerizerProcess(
       const slave::Flags& flags,
+      Fetcher* fetcher,
       const Shared<Docker>& docker)
-    : DockerContainerizerProcess(flags, docker)
+    : DockerContainerizerProcess(flags, fetcher, docker)
   {
     EXPECT_CALL(*this, fetch(_))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
@@ -350,7 +355,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -478,7 +485,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -602,7 +611,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -715,7 +726,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -832,7 +845,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -970,7 +985,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1126,7 +1143,9 @@ TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Recover)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   ContainerID containerId;
   containerId.set_value("c1");
@@ -1249,7 +1268,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1375,7 +1396,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1502,7 +1525,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1634,7 +1659,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1767,10 +1794,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
+  Fetcher fetcher;
+
   // We put the containerizer on the heap so we can more easily
   // control it's lifetime, i.e., when we invoke the destructor.
   MockDockerContainerizer* dockerContainerizer1 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
   ASSERT_SOME(slave1);
@@ -1854,7 +1883,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   MockDockerContainerizer* dockerContainerizer2 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
   ASSERT_SOME(slave2);
@@ -1938,8 +1967,10 @@ TEST_F(DockerContainerizerTest,
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
+  Fetcher fetcher;
+
   MockDockerContainerizer* dockerContainerizer1 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
   ASSERT_SOME(slave1);
@@ -2050,7 +2081,7 @@ TEST_F(DockerContainerizerTest,
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   MockDockerContainerizer* dockerContainerizer2 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
   ASSERT_SOME(slave2);
@@ -2121,7 +2152,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -2256,7 +2289,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
     .WillRepeatedly(FutureResult(
         &logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -2356,10 +2391,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
+  Fetcher fetcher;
+
   // The docker containerizer will free the process, so we must
   // allocate on the heap.
   MockDockerContainerizerProcess* process =
-    new MockDockerContainerizerProcess(flags, docker);
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
 
   MockDockerContainerizer dockerContainerizer(
       (Owned<DockerContainerizerProcess>(process)));
@@ -2461,10 +2498,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
+  Fetcher fetcher;
+
   // The docker containerizer will free the process, so we must
   // allocate on the heap.
   MockDockerContainerizerProcess* process =
-    new MockDockerContainerizerProcess(flags, docker);
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
 
   MockDockerContainerizer dockerContainerizer(
       (Owned<DockerContainerizerProcess>(process)));

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 9e48392..f76182f 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -53,6 +53,8 @@ using namespace process;
 using process::Subprocess;
 using process::Future;
 
+using slave::Fetcher;
+
 using std::string;
 using std::map;
 
@@ -75,7 +77,7 @@ TEST_F(FetcherEnvironmentTest, Simple)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -106,7 +108,7 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -132,7 +134,7 @@ TEST_F(FetcherEnvironmentTest, NoUser)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, None(), flags);
+    Fetcher::environment(commandInfo, directory, None(), flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -158,7 +160,7 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
   flags.hadoop_home = "";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -183,7 +185,7 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
   flags.frameworks_home = "/tmp/frameworks";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -210,7 +212,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -238,7 +240,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -271,15 +273,15 @@ TEST_F(FetcherTest, FileURI)
   uri->set_value("file://" + testFile);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -307,15 +309,15 @@ TEST_F(FetcherTest, FilePath)
   uri->set_value(testFile);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -360,15 +362,15 @@ TEST_F(FetcherTest, OSNetUriTest)
   uri->set_value(url);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -396,15 +398,15 @@ TEST_F(FetcherTest, FileLocalhostURI)
   uri->set_value(path::join("file://localhost", testFile));
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -421,12 +423,18 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
 
   ASSERT_SOME(path);
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get());
   uri->set_executable(false);
   uri->set_extract(false);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -436,18 +444,12 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
 
@@ -471,12 +473,18 @@ TEST_F(FetcherTest, NoExtractExecutable)
 
   ASSERT_SOME(path);
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get());
   uri->set_executable(true);
   uri->set_extract(false);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -486,18 +494,12 @@ TEST_F(FetcherTest, NoExtractExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
 
@@ -529,12 +531,18 @@ TEST_F(FetcherTest, ExtractNotExecutable)
 
   ASSERT_SOME(os::tar(path.get(), path.get() + ".tar.gz"));
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get() + ".tar.gz");
   uri->set_executable(false);
   uri->set_extract(true);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -544,18 +552,12 @@ TEST_F(FetcherTest, ExtractNotExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   ASSERT_TRUE(os::exists(path::join(".", path.get())));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index ae1dcdf..a707398 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -25,6 +25,8 @@
 
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
+
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
@@ -36,10 +38,11 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
 
 using process::Clock;
 using process::Future;
@@ -145,8 +148,10 @@ TEST_F(HealthCheckTest, HealthyTask)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -204,8 +209,10 @@ TEST_F(HealthCheckTest, HealthyTaskNonShell)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -268,8 +275,10 @@ TEST_F(HealthCheckTest, HealthStatusChange)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -358,8 +367,10 @@ TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -443,9 +454,10 @@ TEST_F(HealthCheckTest, EnvironmentSetup)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
-  Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+  Fetcher fetcher;
 
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -506,8 +518,10 @@ TEST_F(HealthCheckTest, DISABLED_GracePeriod)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-  MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 01c0239..1f1c26d 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -58,6 +58,7 @@
 
 #include "slave/containerizer/launcher.hpp"
 #ifdef __linux__
+#include "slave/containerizer/fetcher.hpp"
 #include "slave/containerizer/linux_launcher.hpp"
 
 #include "slave/containerizer/mesos/containerizer.hpp"
@@ -81,6 +82,7 @@ using mesos::internal::master::Master;
 using mesos::internal::slave::CgroupsCpushareIsolatorProcess;
 using mesos::internal::slave::CgroupsMemIsolatorProcess;
 using mesos::internal::slave::CgroupsPerfEventIsolatorProcess;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::SharedFilesystemIsolatorProcess;
 #endif // __linux__
 using mesos::internal::slave::Isolator;
@@ -952,8 +954,10 @@ TEST_F(NamespacesPidIsolatorTest, ROOT_PidNamespace)
 
   string directory = os::getcwd(); // We're inside a temporary sandbox.
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   ASSERT_SOME(containerizer);
 
   ContainerID containerId;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 8bd0f14..cd4a398 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -50,6 +50,7 @@
 #include "slave/state.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
 
 #include "messages/messages.hpp"
 
@@ -66,8 +67,9 @@ using namespace process;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
 
 using std::map;
 using std::string;
@@ -142,7 +144,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -322,7 +326,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -380,7 +386,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -407,7 +413,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -460,7 +468,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -501,7 +509,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -553,7 +563,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -612,7 +622,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -673,7 +685,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -738,7 +750,9 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.recovery_timeout = Milliseconds(1);
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -794,7 +808,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -831,7 +845,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -884,7 +900,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
       _, &GarbageCollectorProcess::schedule);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -921,7 +937,9 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -981,7 +999,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
     .Times(AtMost(1));
 
   // Restart the slave in 'cleanup' recovery mode with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   flags.recover = "cleanup";
@@ -1020,7 +1038,9 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1125,7 +1145,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1214,7 +1236,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
   Future<RegisterSlaveMessage> registerSlaveMessage =
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1266,7 +1290,9 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1318,7 +1344,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -1387,7 +1413,9 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.strict = false;
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1485,7 +1513,7 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -1523,7 +1551,9 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.strict = false;
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1608,7 +1638,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -1673,7 +1703,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1758,7 +1790,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
     .WillRepeatedly(Return());        // Ignore subsequent offers.
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -1792,7 +1824,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1892,7 +1926,9 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -2002,7 +2038,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2062,7 +2100,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
     .WillOnce(FutureArg<1>(&status));
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -2103,7 +2141,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2172,7 +2212,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
     FUTURE_DISPATCH(_, &Slave::executorTerminated);
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2209,7 +2249,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
 
   EXPECT_CALL(allocator, addSlave(_, _, _, _));
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2313,7 +2355,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
     .WillRepeatedly(Return());        // Ignore subsequent offers.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2373,7 +2415,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2453,7 +2497,7 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2535,7 +2579,9 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2638,7 +2684,7 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2665,7 +2711,9 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2740,7 +2788,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2806,7 +2854,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2906,7 +2956,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -3003,7 +3053,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   flags1.slave_subsystems = None();
 #endif
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave1 = this->StartSlave(containerizer1.get(), flags1);
@@ -3040,7 +3092,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   flags2.slave_subsystems = None();
 #endif
 
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Try<PID<Slave> > slave2 = this->StartSlave(containerizer2.get(), flags2);
@@ -3081,7 +3133,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart both slaves using the same flags with new containerizers.
-  Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true);
+  Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true, &fetcher);
   ASSERT_SOME(containerizer3);
 
   Clock::pause();
@@ -3089,7 +3141,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   slave1 = this->StartSlave(containerizer3.get(), flags1);
   ASSERT_SOME(slave1);
 
-  Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true);
+  Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true, &fetcher);
   ASSERT_SOME(containerizer4);
 
   slave2 = this->StartSlave(containerizer4.get(), flags2);
@@ -3254,8 +3306,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3305,7 +3359,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
 
   // Restart the slave (use same flags) with a new containerizer.
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -3357,8 +3411,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3432,7 +3488,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
   flags.perf_interval = Milliseconds(500);
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -3508,8 +3564,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3570,7 +3628,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
   flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -3613,8 +3671,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
   flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3676,7 +3736,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f2896a1..c50cbc7 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -49,6 +49,8 @@
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
+
 #include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
@@ -61,11 +63,12 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::GarbageCollectorProcess;
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
 
 using process::Clock;
 using process::Future;
@@ -103,8 +106,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
   // Set the isolation flag so we know a MesoContainerizer will be created.
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -406,8 +411,10 @@ TEST_F(SlaveTest, MesosExecutorCommandTaskWithArgsList)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -528,8 +535,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -622,8 +631,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());