You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/30 05:58:48 UTC

[1/2] mesos git commit: Consolidate all fetcher env vars into one that holds a JSON object.

Repository: mesos
Updated Branches:
  refs/heads/master 8254f8fa8 -> 699e638ef


Consolidate all fetcher env vars into one that holds a JSON object.

Changed env var ensemble into one env var containing JSON, parsing
this to protobuf for use. Adapted fetcher tests.

Review: https://reviews.apache.org/r/28975


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/699e638e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/699e638e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/699e638e

Branch: refs/heads/master
Commit: 699e638ef80739203ed38ba7238bbc176b107355
Parents: 2fd5659
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 15:42:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800

----------------------------------------------------------------------
 include/mesos/fetcher/fetcher.hpp   |  25 +++++
 include/mesos/fetcher/fetcher.proto |  37 ++++++++
 src/Makefile.am                     |  70 +++++++++-----
 src/launcher/fetcher.cpp            |  40 ++++----
 src/slave/containerizer/fetcher.cpp |  19 ++--
 src/slave/containerizer/fetcher.hpp |  24 +++--
 src/tests/fetcher_tests.cpp         | 156 +++++++++++++++++++++----------
 7 files changed, 267 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/include/mesos/fetcher/fetcher.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.hpp b/include/mesos/fetcher/fetcher.hpp
new file mode 100644
index 0000000..b7e6a71
--- /dev/null
+++ b/include/mesos/fetcher/fetcher.hpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FETCHER_PROTO_HPP__
+#define __FETCHER_PROTO_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <mesos/fetcher/fetcher.pb.h>
+
+#endif // __FETCHER_PROTO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/include/mesos/fetcher/fetcher.proto
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
new file mode 100644
index 0000000..facb87b
--- /dev/null
+++ b/include/mesos/fetcher/fetcher.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos/mesos.proto";
+
+package mesos.fetcher;
+
+option java_package = "org.apache.mesos.fetcher";
+option java_outer_classname = "Protos";
+
+
+/**
+ * Encodes the fetcher environment variable sent to the external fetcher
+ * program.
+ */
+message FetcherInfo {
+  required CommandInfo command_info = 1;
+  required string work_directory = 2;
+  optional string user = 3;
+  optional string frameworks_home = 4;
+  optional string hadoop_home = 5;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3f20213..0521f58 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -134,6 +134,9 @@ MESOS_PROTO = $(top_srcdir)/include/mesos/mesos.proto
 CONTAINERIZER_PROTO =							\
   $(top_srcdir)/include/mesos/containerizer/containerizer.proto
 
+FETCHER_PROTO =								\
+  $(top_srcdir)/include/mesos/fetcher/fetcher.proto
+
 SCHEDULER_PROTO =							\
   $(top_srcdir)/include/mesos/scheduler/scheduler.proto
 
@@ -142,6 +145,8 @@ CXX_PROTOS =								\
   ../include/mesos/mesos.pb.h						\
   containerizer/containerizer.pb.cc					\
   ../include/mesos/containerizer/containerizer.pb.h			\
+  fetcher/fetcher.pb.cc							\
+  ../include/mesos/fetcher/fetcher.pb.h					\
   scheduler/scheduler.pb.cc						\
   ../include/mesos/scheduler/scheduler.pb.h
 
@@ -190,6 +195,12 @@ containerizer/%.pb.cc ../include/mesos/containerizer/%.pb.h: $(CONTAINERIZER_PRO
 	$(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^
 	mv ../include/mesos/containerizer/*.pb.cc $(@D)
 
+fetcher/%.pb.cc ../include/mesos/fetcher/%.pb.h: $(FETCHER_PROTO)
+	$(MKDIR_P) $(@D)
+	$(MKDIR_P) ../include/mesos/fetcher
+	$(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^
+	mv ../include/mesos/fetcher/*.pb.cc $(@D)
+
 scheduler/%.pb.cc ../include/mesos/scheduler/%.pb.h: $(SCHEDULER_PROTO)
 	$(MKDIR_P) $(@D)
 	$(MKDIR_P) ../include/mesos/scheduler
@@ -209,6 +220,10 @@ java/generated/org/apache/mesos/containerizer/Protos.java:		\
 	$(MKDIR_P)  $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
 
+java/generated/org/apache/mesos/fetcher/Protos.java: $(FETCHER_PROTO)
+	$(MKDIR_P)  $(@D)
+	$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+
 java/generated/org/apache/mesos/scheduler/Protos.java: $(SCHEDULER_PROTO)
 	$(MKDIR_P)  $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
@@ -332,6 +347,14 @@ containerizer_HEADERS =							\
 
 nodist_containerizer_HEADERS = ../include/mesos/containerizer/containerizer.pb.h
 
+fetcherdir = $(pkgincludedir)/fetcher
+
+fetcher_HEADERS =							\
+  $(top_srcdir)/include/mesos/fetcher/fetcher.hpp			\
+  $(top_srcdir)/include/mesos/fetcher/fetcher.proto
+
+nodist_fetcher_HEADERS = ../include/mesos/fetcher/fetcher.pb.h
+
 schedulerdir = $(pkgincludedir)/scheduler
 
 scheduler_HEADERS =							\
@@ -597,7 +620,8 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la
 lib_LTLIBRARIES += libmesos.la
 
 # Include as part of the distribution.
-libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(SCHEDULER_PROTO)
+libmesos_la_SOURCES = 							\
+  $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(FETCHER_PROTO) $(SCHEDULER_PROTO)
 
 libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION)
 
@@ -606,10 +630,10 @@ libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION)
 libmesos_la_LIBTOOLFLAGS = --tag=CXX
 
 # Add the convenience library.
-libmesos_la_LIBADD = 		\
-  libmesos_no_3rdparty.la	\
-  -lsvn_subr-1			\
-  -lsvn_delta-1			\
+libmesos_la_LIBADD = 							\
+  libmesos_no_3rdparty.la						\
+  -lsvn_subr-1								\
+  -lsvn_delta-1								\
   -lapr-1
 
 libmesos_la_LIBADD += ../$(LIBPROCESS)/libprocess.la
@@ -801,8 +825,8 @@ nodist_sbin_SCRIPTS += deploy/mesos-daemon.sh				\
   deploy/mesos-start-slaves.sh deploy/mesos-stop-cluster.sh		\
   deploy/mesos-stop-masters.sh deploy/mesos-stop-slaves.sh
 
-pkgsysconf_DATA += deploy/mesos-deploy-env.sh.template	\
-                   deploy/mesos-master-env.sh.template	\
+pkgsysconf_DATA += deploy/mesos-deploy-env.sh.template			\
+                   deploy/mesos-master-env.sh.template			\
                    deploy/mesos-slave-env.sh.template
 
 # Need to explicitly add this because by default DATA files are not
@@ -979,21 +1003,21 @@ PHONY_TARGETS += clean-java
 
 # Python files listed outside HAS_PYTHON so they are included with the
 # distribution unconditionally.
-PYTHON_SOURCE =									\
-	python/src/mesos/__init__.py						\
-	python/interface/src/mesos/__init__.py					\
-	python/interface/src/mesos/interface/__init__.py			\
-	python/native/src/mesos/__init__.py					\
-	python/native/src/mesos/native/__init__.py				\
-	python/native/src/mesos/native/mesos_executor_driver_impl.cpp		\
-	python/native/src/mesos/native/mesos_executor_driver_impl.hpp		\
-	python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp		\
-	python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp		\
-	python/native/src/mesos/native/module.cpp				\
-	python/native/src/mesos/native/module.hpp				\
-	python/native/src/mesos/native/proxy_executor.cpp			\
-	python/native/src/mesos/native/proxy_executor.hpp			\
-	python/native/src/mesos/native/proxy_scheduler.cpp			\
+PYTHON_SOURCE =								\
+	python/src/mesos/__init__.py					\
+	python/interface/src/mesos/__init__.py				\
+	python/interface/src/mesos/interface/__init__.py		\
+	python/native/src/mesos/__init__.py				\
+	python/native/src/mesos/native/__init__.py			\
+	python/native/src/mesos/native/mesos_executor_driver_impl.cpp	\
+	python/native/src/mesos/native/mesos_executor_driver_impl.hpp	\
+	python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp	\
+	python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp	\
+	python/native/src/mesos/native/module.cpp			\
+	python/native/src/mesos/native/module.hpp			\
+	python/native/src/mesos/native/proxy_executor.cpp		\
+	python/native/src/mesos/native/proxy_executor.hpp		\
+	python/native/src/mesos/native/proxy_scheduler.cpp		\
 	python/native/src/mesos/native/proxy_scheduler.hpp
 
 EXTRA_DIST += $(PYTHON_SOURCE)
@@ -1032,7 +1056,7 @@ endif
 PHONY_TARGETS += $(PYTHON_SOURCE)
 
 $(PYTHON_SOURCE):
-	test "$(top_srcdir)" = "$(top_builddir)" ||				\
+	test "$(top_srcdir)" = "$(top_builddir)" ||			\
 		($(MKDIR_P) $(@D) && cp -pf $(srcdir)/$@ $@)
 
 # We currently require both eggs and wheels to be built. Eggs can be added to

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 2f54d9e..fed0105 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -20,6 +20,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
 #include <stout/json.hpp>
 #include <stout/net.hpp>
 #include <stout/option.hpp>
@@ -35,6 +37,8 @@
 using namespace mesos;
 using namespace mesos::internal;
 
+using mesos::fetcher::FetcherInfo;
+
 using std::cerr;
 using std::cout;
 using std::endl;
@@ -264,35 +268,35 @@ int main(int argc, char* argv[])
 
   logging::initialize(argv[0], flags, true); // Catch signals.
 
-  CHECK(os::hasenv("MESOS_COMMAND_INFO"))
-    << "Missing MESOS_COMMAND_INFO environment variable";
+  CHECK(os::hasenv("MESOS_FETCHER_INFO"))
+    << "Missing MESOS_FETCHER_INFO environment variable";
 
   Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(os::getenv("MESOS_COMMAND_INFO"));
+    JSON::parse<JSON::Object>(os::getenv("MESOS_FETCHER_INFO"));
 
   if (parse.isError()) {
-    EXIT(1) << "Failed to parse MESOS_COMMAND_INFO: " << parse.error();
+    EXIT(1) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
   }
 
-  Try<CommandInfo> commandInfo = protobuf::parse<CommandInfo>(parse.get());
-
-  if (commandInfo.isError()) {
-    EXIT(1) << "Failed to parse CommandInfo: " << commandInfo.error();
+  Try<FetcherInfo> fetcherInfo = protobuf::parse<FetcherInfo>(parse.get());
+  if (fetcherInfo.isError()) {
+    EXIT(1) << "Failed to parse FetcherInfo: " << fetcherInfo.error();
   }
 
-  CHECK(os::hasenv("MESOS_WORK_DIRECTORY"))
-    << "Missing MESOS_WORK_DIRECTORY environment variable";
-  std::string directory = os::getenv("MESOS_WORK_DIRECTORY");
+  const CommandInfo& commandInfo = fetcherInfo.get().command_info();
 
-  // We cannot use Some in the ternary expression because the compiler needs to
-  // be able to infer the type, thus the explicit Option<string>.
-  // TODO(idownes): Add an os::hasenv that returns an Option<string>.
-  Option<std::string> user = os::hasenv("MESOS_USER")
-    ? Option<std::string>(os::getenv("MESOS_USER")) // Explicit so it compiles.
-    : None();
+  const string& directory = fetcherInfo.get().work_directory();
+  if (directory.empty()) {
+    EXIT(1) << "Missing work directory";
+  }
+
+  Option<std::string> user = None();
+  if (fetcherInfo.get().has_user()) {
+    user = fetcherInfo.get().user();
+  }
 
   // Fetch each URI to a local file, chmod, then chown if a user is provided.
-  foreach (const CommandInfo::URI& uri, commandInfo.get().uris()) {
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
     // Fetch the URI to a local file.
     Try<string> fetched = fetch(uri.value(), directory);
     if (fetched.isError()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d04799f..5993670 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include <mesos/fetcher/fetcher.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/process.hpp>
 
@@ -29,6 +31,8 @@ using std::vector;
 
 using process::Future;
 
+using mesos::fetcher::FetcherInfo;
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -53,24 +57,27 @@ map<string, string> Fetcher::environment(
     const Option<string>& user,
     const Flags& flags)
 {
-  map<string, string> result;
+  FetcherInfo fetcherInfo;
 
-  result["MESOS_COMMAND_INFO"] = stringify(JSON::Protobuf(commandInfo));
+  fetcherInfo.mutable_command_info()->CopyFrom(commandInfo);
 
-  result["MESOS_WORK_DIRECTORY"] = directory;
+  fetcherInfo.set_work_directory(directory);
 
   if (user.isSome()) {
-    result["MESOS_USER"] = user.get();
+    fetcherInfo.set_user(user.get());
   }
 
   if (!flags.frameworks_home.empty()) {
-    result["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+    fetcherInfo.set_frameworks_home(flags.frameworks_home);
   }
 
   if (!flags.hadoop_home.empty()) {
-    result["HADOOP_HOME"] = flags.hadoop_home;
+    fetcherInfo.set_hadoop_home(flags.hadoop_home);
   }
 
+  map<string, string> result;
+  result["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(fetcherInfo));
+
   return result;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 8bbdf07..1db0eaf 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -22,14 +22,15 @@
 #include <string>
 #include <vector>
 
-#include <stout/hashmap.hpp>
+#include <mesos/mesos.hpp>
 
 #include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/subprocess.hpp>
 
-#include <mesos/mesos.hpp>
-#include <slave/flags.hpp>
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
 
 namespace mesos {
 namespace internal {
@@ -48,6 +49,16 @@ class FetcherProcess;
 class Fetcher
 {
 public:
+  // Builds the environment used to run mesos-fetcher. This
+  // environment contains one variable with the name
+  // "MESOS_FETCHER_INFO", and its value is a protobuf of type
+  // mesos::fetcher::FetcherInfo.
+  static std::map<std::string, std::string> environment(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
   Fetcher();
 
   virtual ~Fetcher();
@@ -77,13 +88,6 @@ public:
   // indicated container. Do nothing if no such subprocess exists.
   void kill(const ContainerID& containerId);
 
-  // Build the environment passed to the mesos-fetcher program.
-  static std::map<std::string, std::string> environment(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const Flags& flags);
-
 private:
   process::Owned<FetcherProcess> process;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index f76182f..8c0b075 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -36,6 +36,8 @@
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
 #include "slave/containerizer/fetcher.hpp"
 #include "slave/flags.hpp"
 
@@ -58,6 +60,7 @@ using slave::Fetcher;
 using std::string;
 using std::map;
 
+using mesos::fetcher::FetcherInfo;
 
 class FetcherEnvironmentTest : public ::testing::Test {};
 
@@ -79,13 +82,21 @@ TEST_F(FetcherEnvironmentTest, Simple)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(5u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -110,13 +121,21 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(5u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -136,12 +155,21 @@ TEST_F(FetcherEnvironmentTest, NoUser)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, None(), flags);
 
-  EXPECT_EQ(4u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_FALSE(fetcherInfo.get().has_user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -162,12 +190,21 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(4u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -187,12 +224,21 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(4u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_FALSE(fetcherInfo.get().has_hadoop_home());
 }
 
 
@@ -214,13 +260,21 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(5u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -242,13 +296,21 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
   map<string, string> environment =
     Fetcher::environment(commandInfo, directory, user, flags);
 
-  EXPECT_EQ(5u, environment.size());
+  EXPECT_EQ(1u, environment.size());
+
+  Try<JSON::Object> parse =
+    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+  ASSERT_SOME(parse);
+
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  ASSERT_SOME(fetcherInfo);
+
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            environment["MESOS_COMMAND_INFO"]);
-  EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
-  EXPECT_EQ(user.get(), environment["MESOS_USER"]);
-  EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+  EXPECT_EQ(user.get(), fetcherInfo.get().user());
+  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
 }
 
 
@@ -272,13 +334,13 @@ TEST_F(FetcherTest, FileURI)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value("file://" + testFile);
 
-  map<string, string> env =
+  map<string, string> environment =
     Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
   Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      env);
+      environment);
 
   ASSERT_SOME(fetcherSubprocess);
   Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -308,13 +370,13 @@ TEST_F(FetcherTest, FilePath)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(testFile);
 
-  map<string, string> env =
+  map<string, string> environment =
     Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
   Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      env);
+      environment);
 
   ASSERT_SOME(fetcherSubprocess);
   Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -361,13 +423,13 @@ TEST_F(FetcherTest, OSNetUriTest)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(url);
 
-  map<string, string> env =
+  map<string, string> environment =
     Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
   Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      env);
+      environment);
 
   ASSERT_SOME(fetcherSubprocess);
   Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -397,13 +459,13 @@ TEST_F(FetcherTest, FileLocalhostURI)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path::join("file://localhost", testFile));
 
-  map<string, string> env =
+  map<string, string> environment =
     Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
   Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      env);
+      environment);
 
   ASSERT_SOME(fetcherSubprocess);
   Future<Option<int>> status = fetcherSubprocess.get().status();


[2/2] mesos git commit: Refactor fetcher namespace into a class/process.

Posted by be...@apache.org.
Refactor fetcher namespace into a class/process.

Layed the groundwork for having a fetcher object with a cache dir per
slave. Factored all fetcher-relevant code into a fetcher class and
process. Added a fetcher parameter to a lot of methods related to
launching tasks, mostly in containerizers. The latter are thus not
holders of fetchers, only slaves are. See MESOS-2172.

Review: https://reviews.apache.org/r/28830


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2fd56590
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2fd56590
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2fd56590

Branch: refs/heads/master
Commit: 2fd565901728309bb4f467f599f08f0b1e57eb69
Parents: 8254f8f
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 16:08:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800

----------------------------------------------------------------------
 src/local/local.cpp                             |  19 +-
 src/slave/containerizer/containerizer.cpp       |  11 +-
 src/slave/containerizer/containerizer.hpp       |   6 +-
 src/slave/containerizer/docker.cpp              |  40 ++--
 src/slave/containerizer/docker.hpp              |  14 +-
 src/slave/containerizer/fetcher.cpp             | 191 ++++++++++++++++---
 src/slave/containerizer/fetcher.hpp             | 174 +++++++++++------
 src/slave/containerizer/mesos/containerizer.cpp |  40 ++--
 src/slave/containerizer/mesos/containerizer.hpp |  13 +-
 src/slave/main.cpp                              |   7 +-
 src/tests/cluster.hpp                           |  10 +-
 src/tests/containerizer_tests.cpp               |  27 ++-
 src/tests/docker_containerizer_tests.cpp        |  83 +++++---
 src/tests/fetcher_tests.cpp                     | 108 ++++++-----
 src/tests/health_check_tests.cpp                |  30 ++-
 src/tests/isolator_tests.cpp                    |   6 +-
 src/tests/slave_recovery_tests.cpp              | 170 +++++++++++------
 src/tests/slave_tests.cpp                       |  23 ++-
 18 files changed, 668 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 89fed0b..76e73a4 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -51,11 +51,13 @@
 
 #include "module/manager.hpp"
 
-#include "slave/containerizer/containerizer.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "state/in_memory.hpp"
 #include "state/log.hpp"
 #include "state/protobuf.hpp"
@@ -74,6 +76,7 @@ using mesos::internal::master::Registrar;
 using mesos::internal::master::Repairer;
 
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::GarbageCollector;
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::StatusUpdateManager;
@@ -108,6 +111,7 @@ static Option<Authorizer*> authorizer = None();
 static Files* files = NULL;
 static vector<GarbageCollector*>* garbageCollectors = NULL;
 static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
+static vector<Fetcher*>* fetchers = NULL;
 
 
 PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -212,6 +216,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
   garbageCollectors = new vector<GarbageCollector*>();
   statusUpdateManagers = new vector<StatusUpdateManager*>();
+  fetchers = new vector<Fetcher*>();
 
   vector<UPID> pids;
 
@@ -226,8 +231,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
     garbageCollectors->push_back(new GarbageCollector());
     statusUpdateManagers->push_back(new StatusUpdateManager(flags));
+    fetchers->push_back(new Fetcher());
+
+    Try<Containerizer*> containerizer =
+      Containerizer::create(flags, true, fetchers->back());
 
-    Try<Containerizer*> containerizer = Containerizer::create(flags, true);
     if (containerizer.isError()) {
       EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
     }
@@ -307,6 +315,13 @@ void shutdown()
     delete statusUpdateManagers;
     statusUpdateManagers = NULL;
 
+    foreach (Fetcher* fetcher, *fetchers) {
+      delete fetcher;
+    }
+
+    delete fetchers;
+    fetchers = NULL;
+
     delete registrar;
     registrar = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 1448bea..e89511a 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -159,7 +159,10 @@ Try<Resources> Containerizer::resources(const Flags& flags)
 }
 
 
-Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
+Try<Containerizer*> Containerizer::create(
+    const Flags& flags,
+    bool local,
+    Fetcher* fetcher)
 {
   if (flags.isolation == "external") {
     LOG(WARNING) << "The 'external' isolation flag is deprecated, "
@@ -167,7 +170,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
                  << " '--containerizers=external'.";
 
     Try<ExternalContainerizer*> containerizer =
-        ExternalContainerizer::create(flags);
+      ExternalContainerizer::create(flags);
     if (containerizer.isError()) {
       return Error("Could not create ExternalContainerizer: " +
                    containerizer.error());
@@ -185,7 +188,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
   foreach (const string& type, strings::split(flags.containerizers, ",")) {
     if (type == "mesos") {
       Try<MesosContainerizer*> containerizer =
-        MesosContainerizer::create(flags, local);
+        MesosContainerizer::create(flags, local, fetcher);
       if (containerizer.isError()) {
         return Error("Could not create MesosContainerizer: " +
                      containerizer.error());
@@ -194,7 +197,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
       }
     } else if (type == "docker") {
       Try<DockerContainerizer*> containerizer =
-        DockerContainerizer::create(flags);
+        DockerContainerizer::create(flags, fetcher);
       if (containerizer.isError()) {
         return Error("Could not create DockerContainerizer: " +
                      containerizer.error());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 02754cd..129e60f 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
+#include "slave/containerizer/fetcher.hpp"
 
 namespace mesos {
 namespace internal {
@@ -55,7 +56,10 @@ class Containerizer
 {
 public:
   // Attempts to create a containerizer as specified by 'isolation' in flags.
-  static Try<Containerizer*> create(const Flags& flags, bool local);
+  static Try<Containerizer*> create(
+      const Flags& flags,
+      bool local,
+      Fetcher* fetcher);
 
   // Determine slave resources from flags, probing the system or querying a
   // delegate.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 19a6ea2..5f4b4ce 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -103,14 +103,19 @@ Option<ContainerID> parse(const Docker::Container& container)
 }
 
 
-Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
+Try<DockerContainerizer*> DockerContainerizer::create(
+    const Flags& flags,
+    Fetcher* fetcher)
 {
   Try<Docker*> docker = Docker::create(flags.docker);
   if (docker.isError()) {
     return Error(docker.error());
   }
 
-  return new DockerContainerizer(flags, Shared<Docker>(docker.get()));
+  return new DockerContainerizer(
+      flags,
+      fetcher,
+      Shared<Docker>(docker.get()));
 }
 
 
@@ -124,8 +129,9 @@ DockerContainerizer::DockerContainerizer(
 
 DockerContainerizer::DockerContainerizer(
     const Flags& flags,
+    Fetcher* fetcher,
     Shared<Docker> docker)
-  : process(new DockerContainerizerProcess(flags, docker))
+  : process(new DockerContainerizerProcess(flags, fetcher, docker))
 {
   spawn(process.get());
 }
@@ -221,29 +227,12 @@ Future<Nothing> DockerContainerizerProcess::fetch(
   CHECK(containers_.contains(containerId));
   Container* container = containers_[containerId];
 
-  CommandInfo commandInfo = container->command();
-
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
-  }
-
-  VLOG(1) << "Starting to fetch URIs for container: " << containerId
-          << ", directory: " << container->directory;
-
-  Try<Subprocess> fetcher = fetcher::run(
-      commandInfo,
+  return fetcher->fetch(
+      containerId,
+      container->command(),
       container->directory,
       None(),
       flags);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  container->fetcher = fetcher.get();
-
-  return fetcher.get().status()
-    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 
@@ -1191,10 +1180,7 @@ void DockerContainerizerProcess::destroy(
     LOG(INFO) << "Destroying Container '"
               << containerId << "' in FETCHING state";
 
-    if (container->fetcher.isSome()) {
-      // Best effort kill the entire fetcher tree.
-      os::killtree(container->fetcher.get().pid(), SIGKILL);
-    }
+    fetcher->kill(containerId);
 
     containerizer::Termination termination;
     termination.set_killed(killed);

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 28ebc62..b7bf54a 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -49,11 +49,14 @@ class DockerContainerizerProcess;
 class DockerContainerizer : public Containerizer
 {
 public:
-  static Try<DockerContainerizer*> create(const Flags& flags);
+  static Try<DockerContainerizer*> create(
+      const Flags& flags,
+      Fetcher* fetcher);
 
   // This is only public for tests.
   DockerContainerizer(
       const Flags& flags,
+      Fetcher* fetcher,
       process::Shared<Docker> docker);
 
   // This is only public for tests.
@@ -110,8 +113,10 @@ class DockerContainerizerProcess
 public:
   DockerContainerizerProcess(
       const Flags& _flags,
+      Fetcher* _fetcher,
       process::Shared<Docker> _docker)
     : flags(_flags),
+      fetcher(_fetcher),
       docker(_docker) {}
 
   virtual process::Future<Nothing> recover(
@@ -242,6 +247,8 @@ private:
 
   const Flags flags;
 
+  Fetcher* fetcher;
+
   process::Shared<Docker> docker;
 
   struct Container
@@ -418,11 +425,6 @@ private:
     // or ExecutorInfo::resources because they can change dynamically.
     Resources resources;
 
-    // The mesos-fetcher subprocess, kept around so that we can do a
-    // killtree on it if we're asked to destroy a container while we
-    // are fetching.
-    Option<Subprocess> fetcher;
-
     // The docker pull future is stored so we can discard when
     // destroy is called while docker is pulling the image.
     Future<Docker::Image> pull;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d702a9c..d04799f 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,19 +16,38 @@
  * limitations under the License.
  */
 
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
 
 using std::map;
 using std::string;
+using std::vector;
+
+using process::Future;
 
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace fetcher {
 
-map<string, string> environment(
+
+Fetcher::Fetcher() : process(new FetcherProcess())
+{
+  spawn(process.get());
+}
+
+
+Fetcher::~Fetcher()
+{
+  terminate(process.get());
+  process::wait(process.get());
+}
+
+
+map<string, string> Fetcher::environment(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -56,7 +75,134 @@ map<string, string> environment(
 }
 
 
-Try<Subprocess> run(
+Future<Nothing> Fetcher::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags,
+    const Option<int>& stdout,
+    const Option<int>& stderr)
+{
+  if (commandInfo.uris().size() == 0) {
+    return Nothing();
+  }
+
+  return dispatch(process.get(),
+                  &FetcherProcess::fetch,
+                  containerId,
+                  commandInfo,
+                  directory,
+                  user,
+                  flags,
+                  stdout,
+                  stderr);
+}
+
+
+Future<Nothing> Fetcher::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags)
+{
+  if (commandInfo.uris().size() == 0) {
+    return Nothing();
+  }
+
+  return dispatch(process.get(),
+                  &FetcherProcess::fetch,
+                  containerId,
+                  commandInfo,
+                  directory,
+                  user,
+                  flags);
+}
+
+
+void Fetcher::kill(const ContainerID& containerId)
+{
+  dispatch(process.get(), &FetcherProcess::kill, containerId);
+}
+
+
+FetcherProcess::~FetcherProcess()
+{
+  foreach (const ContainerID& containerId, subprocessPids.keys()) {
+    kill(containerId);
+  }
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags,
+    const Option<int>& stdout,
+    const Option<int>& stderr)
+{
+  VLOG(1) << "Starting to fetch URIs for container: " << containerId
+        << ", directory: " << directory;
+
+  Try<Subprocess> subprocess =
+    run(commandInfo, directory, user, flags, stdout, stderr);
+
+  if (subprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  }
+
+  subprocessPids[containerId] = subprocess.get().pid();
+
+  return subprocess.get().status()
+    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+    const ContainerID& containerId,
+    const CommandInfo& commandInfo,
+    const string& directory,
+    const Option<string>& user,
+    const Flags& flags)
+{
+  VLOG(1) << "Starting to fetch URIs for container: " << containerId
+        << ", directory: " << directory;
+
+  Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+
+  if (subprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  }
+
+  subprocessPids[containerId] = subprocess.get().pid();
+
+  return subprocess.get().status()
+    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::_fetch(
+    const ContainerID& containerId,
+    const Option<int>& status)
+{
+  subprocessPids.erase(containerId);
+
+  if (status.isNone()) {
+    return Failure("No status available from fetcher");
+  } else if (status.get() != 0) {
+    return Failure("Failed to fetch URIs for container '" +
+                   stringify(containerId) + "'with exit status: " +
+                   stringify(status.get()));
+  }
+
+  return Nothing();
+}
+
+
+Try<Subprocess> FetcherProcess::run(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -83,7 +229,7 @@ Try<Subprocess> run(
 
   LOG(INFO) << "Fetching URIs using command '" << command << "'";
 
-  Try<Subprocess> fetcher = subprocess(
+  Try<Subprocess> fetcherSubprocess = subprocess(
     command,
     Subprocess::PIPE(),
     stdout.isSome()
@@ -92,17 +238,18 @@ Try<Subprocess> run(
     stderr.isSome()
       ? Subprocess::FD(stderr.get())
       : Subprocess::PIPE(),
-    environment(commandInfo, directory, user, flags));
+    Fetcher::environment(commandInfo, directory, user, flags));
 
-  if (fetcher.isError()) {
-    return Error("Failed to execute mesos-fetcher: " + fetcher.error());
+  if (fetcherSubprocess.isError()) {
+    return Error(
+        "Failed to execute mesos-fetcher: " +  fetcherSubprocess.error());
   }
 
-  return fetcher;
+  return fetcherSubprocess;
 }
 
 
-Try<Subprocess> run(
+Try<Subprocess> FetcherProcess::run(
     const CommandInfo& commandInfo,
     const string& directory,
     const Option<string>& user,
@@ -147,7 +294,7 @@ Try<Subprocess> run(
     }
   }
 
-  Try<Subprocess> fetcher = fetcher::run(
+  Try<Subprocess> subprocess = run(
       commandInfo,
       directory,
       user,
@@ -155,31 +302,25 @@ Try<Subprocess> run(
       out.get(),
       err.get());
 
-  fetcher.get().status()
+  subprocess.get().status()
     .onAny(lambda::bind(&os::close, out.get()))
     .onAny(lambda::bind(&os::close, err.get()));
 
-  return fetcher;
+  return subprocess;
 }
 
 
-Future<Nothing> _run(
-    const ContainerID& containerId,
-    const Option<int>& status)
+void FetcherProcess::kill(const ContainerID& containerId)
 {
-  if (status.isNone()) {
-    return Failure("No status available from fetcher");
-  } else if (status.get() != 0) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "'with exit status: " +
-                   stringify(status.get()));
-  }
+  if (subprocessPids.contains(containerId)) {
+    VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+    // Best effort kill the entire fetcher tree.
+    os::killtree(subprocessPids.get(containerId).get(), SIGKILL);
 
-  return Nothing();
+    subprocessPids.erase(containerId);
+  }
 }
 
-
-} // namespace fetcher {
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 12b81b1..8bbdf07 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -19,72 +19,134 @@
 #ifndef __SLAVE_FETCHER_HPP__
 #define __SLAVE_FETCHER_HPP__
 
-#include <map>
 #include <string>
+#include <vector>
+
+#include <stout/hashmap.hpp>
 
 #include <process/future.hpp>
-#include <process/io.hpp>
+#include <process/process.hpp>
 #include <process/subprocess.hpp>
 
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/result.hpp>
-#include <stout/strings.hpp>
-#include <stout/try.hpp>
-
 #include <mesos/mesos.hpp>
-
-#include "slave/flags.hpp"
+#include <slave/flags.hpp>
 
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace fetcher {
-
-// Defines helpers for running the mesos-fetcher.
-// TODO(benh): Consider moving this into a 'fetcher' subdirectory as
-// well as the actual mesos-fetcher program (in launcher/fetcher.cpp
-// as of the writing of this comment).
-
-// Helper method to build the environment used to run mesos-fetcher.
-std::map<std::string, std::string> environment(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags);
-
-// Run the mesos-fetcher for the specified arguments. Note that if
-// 'stdout' and 'stderr' file descriptors are provided then respective
-// output from the mesos-fetcher will be redirected to the file
-// descriptors. The file descriptors are duplicated (via dup) because
-// redirecting might still be occuring even after the mesos-fetcher has
-// terminated since there still might be data to be read.
-Try<process::Subprocess> run(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr);
-
-// Run the mesos-fetcher for the specified arguments, creating a
-// "stdout" and "stderr" file in the given directory and using
-// these for output.
-Try<process::Subprocess> run(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags);
-
-// Check status and return an error if any. Typically used after
-// calling run().
-process::Future<Nothing> _run(
-    const ContainerID& containerId,
-    const Option<int>& status);
-
-} // namespace fetcher {
+
+// Forward declaration.
+class FetcherProcess;
+
+// Argument passing to and invocation of the external fetcher program.
+// TODO(bernd-mesos) : Orchestration and synchronization of fetching
+// phases. Bookkeeping of executor files that are cached after
+// downloading from a URI by the fetcher program. Cache eviction.
+// There has to be exactly one fetcher with a distinct cache dir per
+// active slave. This means that the cache dir can only be fixed
+// after the slave ID has been determined by registration or recovery.
+class Fetcher
+{
+public:
+  Fetcher();
+
+  virtual ~Fetcher();
+
+  // Download the URIs specified in the command info and place the
+  // resulting files into the given work directory. Chmod said files
+  // to the user if given.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  // Same as above, but send stdout and stderr to the files 'stdout'
+  // and 'stderr' in the specified directory.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  // Best effort to kill the fetcher subprocess associated with the
+  // indicated container. Do nothing if no such subprocess exists.
+  void kill(const ContainerID& containerId);
+
+  // Build the environment passed to the mesos-fetcher program.
+  static std::map<std::string, std::string> environment(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+private:
+  process::Owned<FetcherProcess> process;
+};
+
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+  FetcherProcess() : ProcessBase("__fetcher__") {}
+
+  virtual ~FetcherProcess();
+
+  // Fetcher implementation.
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  void kill(const ContainerID& containerId);
+
+private:
+  // Check status and return an error if any.
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const Option<int>& status);
+
+  // Run the mesos-fetcher with custom output redirection. If
+  // 'stdout' and 'stderr' file descriptors are provided then respective
+  // output from the mesos-fetcher will be redirected to the file
+  // descriptors. The file descriptors are duplicated (via dup) because
+  // redirecting might still be occuring even after the mesos-fetcher has
+  // terminated since there still might be data to be read.
+  // This method is only "public" for test purposes.
+  Try<process::Subprocess> run(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags,
+      const Option<int>& stdout,
+      const Option<int>& stderr);
+
+  // Run the mesos-fetcher, creating a "stdout" and "stderr" file
+  // in the given directory and using these for output.
+  Try<process::Subprocess> run(
+      const CommandInfo& commandInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const Flags& flags);
+
+  hashmap<ContainerID, pid_t> subprocessPids;
+};
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d70259b..5c014eb 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -77,7 +77,8 @@ Future<Nothing> _nothing() { return Nothing(); }
 
 Try<MesosContainerizer*> MesosContainerizer::create(
     const Flags& flags,
-    bool local)
+    bool local,
+    Fetcher* fetcher)
 {
   string isolation;
   if (flags.isolation == "process") {
@@ -164,16 +165,22 @@ Try<MesosContainerizer*> MesosContainerizer::create(
   }
 
   return new MesosContainerizer(
-      flags_, local, Owned<Launcher>(launcher.get()), isolators);
+      flags_, local, fetcher, Owned<Launcher>(launcher.get()), isolators);
 }
 
 
 MesosContainerizer::MesosContainerizer(
     const Flags& flags,
     bool local,
+    Fetcher* fetcher,
     const Owned<Launcher>& launcher,
     const vector<Owned<Isolator>>& isolators)
-  : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
+  : process(new MesosContainerizerProcess(
+      flags,
+      local,
+      fetcher,
+      launcher,
+      isolators))
 {
   spawn(process.get());
 }
@@ -540,29 +547,12 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     return Failure("Container is already destroyed");
   }
 
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
-  }
-
-  Try<Subprocess> fetcher = fetcher::run(
+  return fetcher->fetch(
+      containerId,
       commandInfo,
       directory,
       user,
       flags);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  // TODO(tnachen): Currently the fetcher won't shutdown when slave
-  // exits. This means the fetcher will still be running when slave
-  // restarts and after recovering. We won't resume the task since
-  // it hasn't checkpointed yet. Once the fetcher supports existing
-  // on slave it will be removed automatically.
-  containers_[containerId]->fetcher = fetcher.get();
-
-  return fetcher.get().status()
-    .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
 
 
@@ -899,10 +889,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& containerId)
     return;
   }
 
-  if (container->state == FETCHING && container->fetcher.isSome()) {
-    VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
-    // Best effort kill the entire fetcher tree.
-    os::killtree(container->fetcher.get().pid(), SIGKILL);
+  if (container->state == FETCHING) {
+    fetcher->kill(containerId);
   }
 
   if (container->state == ISOLATING) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 0b635d4..802988c 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -39,11 +39,15 @@ class MesosContainerizerProcess;
 class MesosContainerizer : public Containerizer
 {
 public:
-  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+  static Try<MesosContainerizer*> create(
+      const Flags& flags,
+      bool local,
+      Fetcher* fetcher);
 
   MesosContainerizer(
       const Flags& flags,
       bool local,
+      Fetcher* fetcher,
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<Isolator>>& isolators);
 
@@ -100,10 +104,12 @@ public:
   MesosContainerizerProcess(
       const Flags& _flags,
       bool _local,
+      Fetcher* _fetcher,
       const process::Owned<Launcher>& _launcher,
       const std::vector<process::Owned<Isolator>>& _isolators)
     : flags(_flags),
       local(_local),
+      fetcher(_fetcher),
       launcher(_launcher),
       isolators(_isolators) {}
 
@@ -214,6 +220,7 @@ private:
 
   const Flags flags;
   const bool local;
+  Fetcher* fetcher;
   const process::Owned<Launcher> launcher;
   const std::vector<process::Owned<Isolator>> isolators;
 
@@ -245,10 +252,6 @@ private:
     // determine the cause of an executor termination.
     std::vector<Limitation> limitations;
 
-    // The mesos-fetcher subprocess, that we keep around so we can
-    // stop the fetcher when the container is destroyed.
-    Option<process::Subprocess> fetcher;
-
     // We keep track of the resources for each container so we can set the
     // ResourceStatistics limits in usage().
     Resources resources;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 087944a..2ff2b0d 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -151,13 +151,18 @@ int main(int argc, char** argv)
     LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
   }
 
-  Try<Containerizer*> containerizer = Containerizer::create(flags, false);
+  Fetcher fetcher;
+
+  Try<Containerizer*> containerizer =
+    Containerizer::create(flags, false, &fetcher);
+
   if (containerizer.isError()) {
     EXIT(1) << "Failed to create a containerizer: "
             << containerizer.error();
   }
 
   Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
   if (detector.isError()) {
     EXIT(1) << "Failed to create a master detector: " << detector.error();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index fa5eeef..74cedb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -63,10 +63,12 @@
 
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
-#include "slave/containerizer/containerizer.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "state/in_memory.hpp"
 #include "state/log.hpp"
 #include "state/protobuf.hpp"
@@ -185,6 +187,7 @@ public:
       slave::Containerizer* containerizer;
       bool createdContainerizer; // Whether we own the containerizer.
 
+      process::Owned<slave::Fetcher> fetcher;
       process::Owned<slave::StatusUpdateManager> statusUpdateManager;
       process::Owned<slave::GarbageCollector> gc;
       process::Owned<MasterDetector> detector;
@@ -468,8 +471,11 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
   if (containerizer.isSome()) {
     slave.containerizer = containerizer.get();
   } else {
+    // Create a new fetcher.
+    slave.fetcher.reset(new slave::Fetcher());
+
     Try<slave::Containerizer*> containerizer =
-      slave::Containerizer::create(flags, true);
+      slave::Containerizer::create(flags, true, slave.fetcher.get());
     CHECK_SOME(containerizer);
 
     slave.containerizer = containerizer.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 02a5f15..cfe31a6 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,6 +31,7 @@
 
 #include "slave/flags.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
 
@@ -61,6 +62,7 @@ public:
   // Construct a MesosContainerizer with TestIsolator(s) which use the provided
   // 'prepare' command(s).
   Try<MesosContainerizer*> CreateContainerizer(
+      Fetcher* fetcher,
       const vector<Option<CommandInfo> >& prepares)
   {
     vector<Owned<Isolator> > isolators;
@@ -85,18 +87,20 @@ public:
     return new MesosContainerizer(
         flags,
         false,
+        fetcher,
         Owned<Launcher>(launcher.get()),
         isolators);
   }
 
 
   Try<MesosContainerizer*> CreateContainerizer(
+      Fetcher* fetcher,
       const Option<CommandInfo>& prepare)
   {
     vector<Option<CommandInfo> > prepares;
     prepares.push_back(prepare);
 
-    return CreateContainerizer(prepares);
+    return CreateContainerizer(fetcher, prepares);
   }
 };
 
@@ -107,7 +111,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptSucceeds)
   string directory = os::getcwd(); // We're inside a temporary sandbox.
   string file = path::join(directory, "child.script.executed");
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer = CreateContainerizer(
+      &fetcher,
       CREATE_COMMAND_INFO("touch " + file));
   CHECK_SOME(containerizer);
 
@@ -151,7 +158,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptFails)
   string directory = os::getcwd(); // We're inside a temporary sandbox.
   string file = path::join(directory, "child.script.executed");
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer = CreateContainerizer(
+      &fetcher,
       CREATE_COMMAND_INFO("touch " + file + " && exit 1"));
   CHECK_SOME(containerizer);
 
@@ -205,7 +215,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, MultipleScripts)
   // This will fail, either first or after the successful command.
   prepares.push_back(CREATE_COMMAND_INFO("touch " + file2 + " && exit 1"));
 
-  Try<MesosContainerizer*> containerizer = CreateContainerizer(prepares);
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> containerizer =
+    CreateContainerizer(&fetcher, prepares);
   CHECK_SOME(containerizer);
 
   ContainerID containerId;
@@ -251,9 +264,11 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
+  Fetcher fetcher;
+
   // Use local=false so std{err,out} are redirected to files.
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   ASSERT_SOME(containerizer);
 
   ContainerID containerId;
@@ -306,9 +321,10 @@ public:
   MockMesosContainerizerProcess(
       const Flags& flags,
       bool local,
+      Fetcher* fetcher,
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<Isolator>>& isolators)
-    : MesosContainerizerProcess(flags, local, launcher, isolators)
+    : MesosContainerizerProcess(flags, local, fetcher, launcher, isolators)
   {
     // NOTE: See TestContainerizer::setup for why we use
     // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
@@ -343,9 +359,12 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
   ASSERT_SOME(launcher);
   std::vector<process::Owned<Isolator>> isolators;
 
+  Fetcher fetcher;
+
   MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
       flags,
       true,
+      &fetcher,
       Owned<Launcher>(launcher.get()),
       isolators);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index bed2d10..2105ae2 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -34,6 +34,8 @@
 #include "tests/mesos.hpp"
 
 #include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
 #include "slave/state.hpp"
@@ -49,9 +51,10 @@ using namespace process;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::DockerContainerizer;
 using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
 
 using process::Future;
 using process::Message;
@@ -171,8 +174,9 @@ class MockDockerContainerizer : public DockerContainerizer {
 public:
   MockDockerContainerizer(
       const slave::Flags& flags,
+      Fetcher* fetcher,
       Shared<Docker> docker)
-    : DockerContainerizer(flags, docker)
+    : DockerContainerizer(flags, fetcher, docker)
   {
     initialize();
   }
@@ -285,8 +289,9 @@ class MockDockerContainerizerProcess : public DockerContainerizerProcess
 public:
   MockDockerContainerizerProcess(
       const slave::Flags& flags,
+      Fetcher* fetcher,
       const Shared<Docker>& docker)
-    : DockerContainerizerProcess(flags, docker)
+    : DockerContainerizerProcess(flags, fetcher, docker)
   {
     EXPECT_CALL(*this, fetch(_))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
@@ -350,7 +355,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -478,7 +485,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -602,7 +611,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -715,7 +726,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   slave::Flags flags = CreateSlaveFlags();
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -832,7 +845,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -970,7 +985,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1126,7 +1143,9 @@ TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Recover)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   ContainerID containerId;
   containerId.set_value("c1");
@@ -1249,7 +1268,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1375,7 +1396,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1502,7 +1525,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1634,7 +1659,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -1767,10 +1794,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
+  Fetcher fetcher;
+
   // We put the containerizer on the heap so we can more easily
   // control it's lifetime, i.e., when we invoke the destructor.
   MockDockerContainerizer* dockerContainerizer1 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
   ASSERT_SOME(slave1);
@@ -1854,7 +1883,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   MockDockerContainerizer* dockerContainerizer2 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
   ASSERT_SOME(slave2);
@@ -1938,8 +1967,10 @@ TEST_F(DockerContainerizerTest,
                            Invoke((MockDocker*) docker.get(),
                                   &MockDocker::_logs)));
 
+  Fetcher fetcher;
+
   MockDockerContainerizer* dockerContainerizer1 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
   ASSERT_SOME(slave1);
@@ -2050,7 +2081,7 @@ TEST_F(DockerContainerizerTest,
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   MockDockerContainerizer* dockerContainerizer2 =
-    new MockDockerContainerizer(flags, docker);
+    new MockDockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
   ASSERT_SOME(slave2);
@@ -2121,7 +2152,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
   EXPECT_CALL(*mockDocker, stop(_, _, _))
     .WillRepeatedly(Return(Nothing()));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -2256,7 +2289,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
     .WillRepeatedly(FutureResult(
         &logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs)));
 
-  MockDockerContainerizer dockerContainerizer(flags, docker);
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
 
   Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
@@ -2356,10 +2391,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
+  Fetcher fetcher;
+
   // The docker containerizer will free the process, so we must
   // allocate on the heap.
   MockDockerContainerizerProcess* process =
-    new MockDockerContainerizerProcess(flags, docker);
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
 
   MockDockerContainerizer dockerContainerizer(
       (Owned<DockerContainerizerProcess>(process)));
@@ -2461,10 +2498,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
+  Fetcher fetcher;
+
   // The docker containerizer will free the process, so we must
   // allocate on the heap.
   MockDockerContainerizerProcess* process =
-    new MockDockerContainerizerProcess(flags, docker);
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
 
   MockDockerContainerizer dockerContainerizer(
       (Owned<DockerContainerizerProcess>(process)));

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 9e48392..f76182f 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -53,6 +53,8 @@ using namespace process;
 using process::Subprocess;
 using process::Future;
 
+using slave::Fetcher;
+
 using std::string;
 using std::map;
 
@@ -75,7 +77,7 @@ TEST_F(FetcherEnvironmentTest, Simple)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -106,7 +108,7 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -132,7 +134,7 @@ TEST_F(FetcherEnvironmentTest, NoUser)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, None(), flags);
+    Fetcher::environment(commandInfo, directory, None(), flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -158,7 +160,7 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
   flags.hadoop_home = "";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -183,7 +185,7 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
   flags.frameworks_home = "/tmp/frameworks";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(4u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -210,7 +212,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -238,7 +240,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
   flags.hadoop_home = "/tmp/hadoop";
 
   map<string, string> environment =
-    fetcher::environment(commandInfo, directory, user, flags);
+    Fetcher::environment(commandInfo, directory, user, flags);
 
   EXPECT_EQ(5u, environment.size());
   EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -271,15 +273,15 @@ TEST_F(FetcherTest, FileURI)
   uri->set_value("file://" + testFile);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -307,15 +309,15 @@ TEST_F(FetcherTest, FilePath)
   uri->set_value(testFile);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -360,15 +362,15 @@ TEST_F(FetcherTest, OSNetUriTest)
   uri->set_value(url);
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -396,15 +398,15 @@ TEST_F(FetcherTest, FileLocalhostURI)
   uri->set_value(path::join("file://localhost", testFile));
 
   map<string, string> env =
-    fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
 
-  Try<Subprocess> fetcherProcess =
+  Try<Subprocess> fetcherSubprocess =
     process::subprocess(
       path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
       env);
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  ASSERT_SOME(fetcherSubprocess);
+  Future<Option<int>> status = fetcherSubprocess.get().status();
 
   AWAIT_READY(status);
   ASSERT_SOME(status.get());
@@ -421,12 +423,18 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
 
   ASSERT_SOME(path);
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get());
   uri->set_executable(false);
   uri->set_extract(false);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -436,18 +444,12 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
 
@@ -471,12 +473,18 @@ TEST_F(FetcherTest, NoExtractExecutable)
 
   ASSERT_SOME(path);
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get());
   uri->set_executable(true);
   uri->set_extract(false);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -486,18 +494,12 @@ TEST_F(FetcherTest, NoExtractExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
 
@@ -529,12 +531,18 @@ TEST_F(FetcherTest, ExtractNotExecutable)
 
   ASSERT_SOME(os::tar(path.get(), path.get() + ".tar.gz"));
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path.get() + ".tar.gz");
   uri->set_executable(false);
   uri->set_extract(true);
 
+  slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
   Option<int> stdout = None();
   Option<int> stderr = None();
 
@@ -544,18 +552,12 @@ TEST_F(FetcherTest, ExtractNotExecutable)
     stderr = STDERR_FILENO;
   }
 
-  slave::Flags flags;
-  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
-  Try<Subprocess> fetcherProcess =
-    fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+  Fetcher fetcher;
 
-  ASSERT_SOME(fetcherProcess);
-  Future<Option<int>> status = fetcherProcess.get().status();
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  AWAIT_READY(fetch);
 
   ASSERT_TRUE(os::exists(path::join(".", path.get())));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index ae1dcdf..a707398 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -25,6 +25,8 @@
 
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
+
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
@@ -36,10 +38,11 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
 
 using process::Clock;
 using process::Future;
@@ -145,8 +148,10 @@ TEST_F(HealthCheckTest, HealthyTask)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -204,8 +209,10 @@ TEST_F(HealthCheckTest, HealthyTaskNonShell)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -268,8 +275,10 @@ TEST_F(HealthCheckTest, HealthStatusChange)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -358,8 +367,10 @@ TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -443,9 +454,10 @@ TEST_F(HealthCheckTest, EnvironmentSetup)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
-  Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+  Fetcher fetcher;
 
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -506,8 +518,10 @@ TEST_F(HealthCheckTest, DISABLED_GracePeriod)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-  MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 01c0239..1f1c26d 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -58,6 +58,7 @@
 
 #include "slave/containerizer/launcher.hpp"
 #ifdef __linux__
+#include "slave/containerizer/fetcher.hpp"
 #include "slave/containerizer/linux_launcher.hpp"
 
 #include "slave/containerizer/mesos/containerizer.hpp"
@@ -81,6 +82,7 @@ using mesos::internal::master::Master;
 using mesos::internal::slave::CgroupsCpushareIsolatorProcess;
 using mesos::internal::slave::CgroupsMemIsolatorProcess;
 using mesos::internal::slave::CgroupsPerfEventIsolatorProcess;
+using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::SharedFilesystemIsolatorProcess;
 #endif // __linux__
 using mesos::internal::slave::Isolator;
@@ -952,8 +954,10 @@ TEST_F(NamespacesPidIsolatorTest, ROOT_PidNamespace)
 
   string directory = os::getcwd(); // We're inside a temporary sandbox.
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   ASSERT_SOME(containerizer);
 
   ContainerID containerId;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 8bd0f14..cd4a398 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -50,6 +50,7 @@
 #include "slave/state.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
 
 #include "messages/messages.hpp"
 
@@ -66,8 +67,9 @@ using namespace process;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
 
 using std::map;
 using std::string;
@@ -142,7 +144,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -322,7 +326,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -380,7 +386,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -407,7 +413,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -460,7 +468,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -501,7 +509,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -553,7 +563,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -612,7 +622,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -673,7 +685,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -738,7 +750,9 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.recovery_timeout = Milliseconds(1);
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -794,7 +808,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
   Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -831,7 +845,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -884,7 +900,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
       _, &GarbageCollectorProcess::schedule);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -921,7 +937,9 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -981,7 +999,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
     .Times(AtMost(1));
 
   // Restart the slave in 'cleanup' recovery mode with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   flags.recover = "cleanup";
@@ -1020,7 +1038,9 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1125,7 +1145,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1214,7 +1236,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
   Future<RegisterSlaveMessage> registerSlaveMessage =
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1266,7 +1290,9 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1318,7 +1344,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -1387,7 +1413,9 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.strict = false;
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1485,7 +1513,7 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -1523,7 +1551,9 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.strict = false;
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1608,7 +1638,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -1673,7 +1703,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1758,7 +1790,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
     .WillRepeatedly(Return());        // Ignore subsequent offers.
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -1792,7 +1824,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1892,7 +1926,9 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -2002,7 +2038,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2062,7 +2100,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
     .WillOnce(FutureArg<1>(&status));
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -2103,7 +2141,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2172,7 +2212,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
     FUTURE_DISPATCH(_, &Slave::executorTerminated);
 
   // Now restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2209,7 +2249,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
 
   EXPECT_CALL(allocator, addSlave(_, _, _, _));
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2313,7 +2355,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
     .WillRepeatedly(Return());        // Ignore subsequent offers.
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2373,7 +2415,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2453,7 +2497,7 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2535,7 +2579,9 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2638,7 +2684,7 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2665,7 +2711,9 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2740,7 +2788,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new isolator.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -2806,7 +2854,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2906,7 +2956,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -3003,7 +3053,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   flags1.slave_subsystems = None();
 #endif
 
-  Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true);
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave1 = this->StartSlave(containerizer1.get(), flags1);
@@ -3040,7 +3092,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   flags2.slave_subsystems = None();
 #endif
 
-  Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true);
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Try<PID<Slave> > slave2 = this->StartSlave(containerizer2.get(), flags2);
@@ -3081,7 +3133,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Restart both slaves using the same flags with new containerizers.
-  Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true);
+  Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true, &fetcher);
   ASSERT_SOME(containerizer3);
 
   Clock::pause();
@@ -3089,7 +3141,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   slave1 = this->StartSlave(containerizer3.get(), flags1);
   ASSERT_SOME(slave1);
 
-  Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true);
+  Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true, &fetcher);
   ASSERT_SOME(containerizer4);
 
   slave2 = this->StartSlave(containerizer4.get(), flags2);
@@ -3254,8 +3306,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3305,7 +3359,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
 
   // Restart the slave (use same flags) with a new containerizer.
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   slave = this->StartSlave(containerizer2.get(), flags);
@@ -3357,8 +3411,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3432,7 +3488,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
   flags.perf_interval = Milliseconds(500);
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -3508,8 +3564,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3570,7 +3628,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
   flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;
@@ -3613,8 +3671,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
   flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
   flags.slave_subsystems = "";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer1 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer1);
 
   Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3676,7 +3736,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
   flags.isolation = "cgroups/cpu,cgroups/mem";
 
   Try<MesosContainerizer*> containerizer2 =
-    MesosContainerizer::create(flags, true);
+    MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
   Future<vector<Offer> > offers2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f2896a1..c50cbc7 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -49,6 +49,8 @@
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
+
 #include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
@@ -61,11 +63,12 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
-using mesos::internal::slave::GarbageCollectorProcess;
-using mesos::internal::slave::Slave;
 using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::MesosContainerizer;
 using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
 
 using process::Clock;
 using process::Future;
@@ -103,8 +106,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
   // Set the isolation flag so we know a MesoContainerizer will be created.
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -406,8 +411,10 @@ TEST_F(SlaveTest, MesosExecutorCommandTaskWithArgsList)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -528,8 +535,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -622,8 +631,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
   slave::Flags flags = CreateSlaveFlags();
   flags.isolation = "posix/cpu,posix/mem";
 
+  Fetcher fetcher;
+
   Try<MesosContainerizer*> containerizer =
-    MesosContainerizer::create(flags, false);
+    MesosContainerizer::create(flags, false, &fetcher);
   CHECK_SOME(containerizer);
 
   Try<PID<Slave> > slave = StartSlave(containerizer.get());