You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/30 05:58:49 UTC
[2/2] mesos git commit: Refactor fetcher namespace into a
class/process.
Refactor fetcher namespace into a class/process.
Layed the groundwork for having a fetcher object with a cache dir per
slave. Factored all fetcher-relevant code into a fetcher class and
process. Added a fetcher parameter to a lot of methods related to
launching tasks, mostly in containerizers. The latter are thus not
holders of fetchers, only slaves are. See MESOS-2172.
Review: https://reviews.apache.org/r/28830
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2fd56590
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2fd56590
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2fd56590
Branch: refs/heads/master
Commit: 2fd565901728309bb4f467f599f08f0b1e57eb69
Parents: 8254f8f
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 16:08:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800
----------------------------------------------------------------------
src/local/local.cpp | 19 +-
src/slave/containerizer/containerizer.cpp | 11 +-
src/slave/containerizer/containerizer.hpp | 6 +-
src/slave/containerizer/docker.cpp | 40 ++--
src/slave/containerizer/docker.hpp | 14 +-
src/slave/containerizer/fetcher.cpp | 191 ++++++++++++++++---
src/slave/containerizer/fetcher.hpp | 174 +++++++++++------
src/slave/containerizer/mesos/containerizer.cpp | 40 ++--
src/slave/containerizer/mesos/containerizer.hpp | 13 +-
src/slave/main.cpp | 7 +-
src/tests/cluster.hpp | 10 +-
src/tests/containerizer_tests.cpp | 27 ++-
src/tests/docker_containerizer_tests.cpp | 83 +++++---
src/tests/fetcher_tests.cpp | 108 ++++++-----
src/tests/health_check_tests.cpp | 30 ++-
src/tests/isolator_tests.cpp | 6 +-
src/tests/slave_recovery_tests.cpp | 170 +++++++++++------
src/tests/slave_tests.cpp | 23 ++-
18 files changed, 668 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 89fed0b..76e73a4 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -51,11 +51,13 @@
#include "module/manager.hpp"
-#include "slave/containerizer/containerizer.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
@@ -74,6 +76,7 @@ using mesos::internal::master::Registrar;
using mesos::internal::master::Repairer;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::Slave;
using mesos::internal::slave::StatusUpdateManager;
@@ -108,6 +111,7 @@ static Option<Authorizer*> authorizer = None();
static Files* files = NULL;
static vector<GarbageCollector*>* garbageCollectors = NULL;
static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
+static vector<Fetcher*>* fetchers = NULL;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -212,6 +216,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
garbageCollectors = new vector<GarbageCollector*>();
statusUpdateManagers = new vector<StatusUpdateManager*>();
+ fetchers = new vector<Fetcher*>();
vector<UPID> pids;
@@ -226,8 +231,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
garbageCollectors->push_back(new GarbageCollector());
statusUpdateManagers->push_back(new StatusUpdateManager(flags));
+ fetchers->push_back(new Fetcher());
+
+ Try<Containerizer*> containerizer =
+ Containerizer::create(flags, true, fetchers->back());
- Try<Containerizer*> containerizer = Containerizer::create(flags, true);
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
}
@@ -307,6 +315,13 @@ void shutdown()
delete statusUpdateManagers;
statusUpdateManagers = NULL;
+ foreach (Fetcher* fetcher, *fetchers) {
+ delete fetcher;
+ }
+
+ delete fetchers;
+ fetchers = NULL;
+
delete registrar;
registrar = NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 1448bea..e89511a 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -159,7 +159,10 @@ Try<Resources> Containerizer::resources(const Flags& flags)
}
-Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
+Try<Containerizer*> Containerizer::create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher)
{
if (flags.isolation == "external") {
LOG(WARNING) << "The 'external' isolation flag is deprecated, "
@@ -167,7 +170,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
<< " '--containerizers=external'.";
Try<ExternalContainerizer*> containerizer =
- ExternalContainerizer::create(flags);
+ ExternalContainerizer::create(flags);
if (containerizer.isError()) {
return Error("Could not create ExternalContainerizer: " +
containerizer.error());
@@ -185,7 +188,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
foreach (const string& type, strings::split(flags.containerizers, ",")) {
if (type == "mesos") {
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, local);
+ MesosContainerizer::create(flags, local, fetcher);
if (containerizer.isError()) {
return Error("Could not create MesosContainerizer: " +
containerizer.error());
@@ -194,7 +197,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
}
} else if (type == "docker") {
Try<DockerContainerizer*> containerizer =
- DockerContainerizer::create(flags);
+ DockerContainerizer::create(flags, fetcher);
if (containerizer.isError()) {
return Error("Could not create DockerContainerizer: " +
containerizer.error());
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 02754cd..129e60f 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
#include <stout/option.hpp>
#include <stout/try.hpp>
+#include "slave/containerizer/fetcher.hpp"
namespace mesos {
namespace internal {
@@ -55,7 +56,10 @@ class Containerizer
{
public:
// Attempts to create a containerizer as specified by 'isolation' in flags.
- static Try<Containerizer*> create(const Flags& flags, bool local);
+ static Try<Containerizer*> create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher);
// Determine slave resources from flags, probing the system or querying a
// delegate.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 19a6ea2..5f4b4ce 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -103,14 +103,19 @@ Option<ContainerID> parse(const Docker::Container& container)
}
-Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
+Try<DockerContainerizer*> DockerContainerizer::create(
+ const Flags& flags,
+ Fetcher* fetcher)
{
Try<Docker*> docker = Docker::create(flags.docker);
if (docker.isError()) {
return Error(docker.error());
}
- return new DockerContainerizer(flags, Shared<Docker>(docker.get()));
+ return new DockerContainerizer(
+ flags,
+ fetcher,
+ Shared<Docker>(docker.get()));
}
@@ -124,8 +129,9 @@ DockerContainerizer::DockerContainerizer(
DockerContainerizer::DockerContainerizer(
const Flags& flags,
+ Fetcher* fetcher,
Shared<Docker> docker)
- : process(new DockerContainerizerProcess(flags, docker))
+ : process(new DockerContainerizerProcess(flags, fetcher, docker))
{
spawn(process.get());
}
@@ -221,29 +227,12 @@ Future<Nothing> DockerContainerizerProcess::fetch(
CHECK(containers_.contains(containerId));
Container* container = containers_[containerId];
- CommandInfo commandInfo = container->command();
-
- if (commandInfo.uris().size() == 0) {
- return Nothing();
- }
-
- VLOG(1) << "Starting to fetch URIs for container: " << containerId
- << ", directory: " << container->directory;
-
- Try<Subprocess> fetcher = fetcher::run(
- commandInfo,
+ return fetcher->fetch(
+ containerId,
+ container->command(),
container->directory,
None(),
flags);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- container->fetcher = fetcher.get();
-
- return fetcher.get().status()
- .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
}
@@ -1191,10 +1180,7 @@ void DockerContainerizerProcess::destroy(
LOG(INFO) << "Destroying Container '"
<< containerId << "' in FETCHING state";
- if (container->fetcher.isSome()) {
- // Best effort kill the entire fetcher tree.
- os::killtree(container->fetcher.get().pid(), SIGKILL);
- }
+ fetcher->kill(containerId);
containerizer::Termination termination;
termination.set_killed(killed);
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 28ebc62..b7bf54a 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -49,11 +49,14 @@ class DockerContainerizerProcess;
class DockerContainerizer : public Containerizer
{
public:
- static Try<DockerContainerizer*> create(const Flags& flags);
+ static Try<DockerContainerizer*> create(
+ const Flags& flags,
+ Fetcher* fetcher);
// This is only public for tests.
DockerContainerizer(
const Flags& flags,
+ Fetcher* fetcher,
process::Shared<Docker> docker);
// This is only public for tests.
@@ -110,8 +113,10 @@ class DockerContainerizerProcess
public:
DockerContainerizerProcess(
const Flags& _flags,
+ Fetcher* _fetcher,
process::Shared<Docker> _docker)
: flags(_flags),
+ fetcher(_fetcher),
docker(_docker) {}
virtual process::Future<Nothing> recover(
@@ -242,6 +247,8 @@ private:
const Flags flags;
+ Fetcher* fetcher;
+
process::Shared<Docker> docker;
struct Container
@@ -418,11 +425,6 @@ private:
// or ExecutorInfo::resources because they can change dynamically.
Resources resources;
- // The mesos-fetcher subprocess, kept around so that we can do a
- // killtree on it if we're asked to destroy a container while we
- // are fetching.
- Option<Subprocess> fetcher;
-
// The docker pull future is stored so we can discard when
// destroy is called while docker is pulling the image.
Future<Docker::Image> pull;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d702a9c..d04799f 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,19 +16,38 @@
* limitations under the License.
*/
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
using std::map;
using std::string;
+using std::vector;
+
+using process::Future;
namespace mesos {
namespace internal {
namespace slave {
-namespace fetcher {
-map<string, string> environment(
+
+Fetcher::Fetcher() : process(new FetcherProcess())
+{
+ spawn(process.get());
+}
+
+
+Fetcher::~Fetcher()
+{
+ terminate(process.get());
+ process::wait(process.get());
+}
+
+
+map<string, string> Fetcher::environment(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -56,7 +75,134 @@ map<string, string> environment(
}
-Try<Subprocess> run(
+Future<Nothing> Fetcher::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr)
+{
+ if (commandInfo.uris().size() == 0) {
+ return Nothing();
+ }
+
+ return dispatch(process.get(),
+ &FetcherProcess::fetch,
+ containerId,
+ commandInfo,
+ directory,
+ user,
+ flags,
+ stdout,
+ stderr);
+}
+
+
+Future<Nothing> Fetcher::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags)
+{
+ if (commandInfo.uris().size() == 0) {
+ return Nothing();
+ }
+
+ return dispatch(process.get(),
+ &FetcherProcess::fetch,
+ containerId,
+ commandInfo,
+ directory,
+ user,
+ flags);
+}
+
+
+void Fetcher::kill(const ContainerID& containerId)
+{
+ dispatch(process.get(), &FetcherProcess::kill, containerId);
+}
+
+
+FetcherProcess::~FetcherProcess()
+{
+ foreach (const ContainerID& containerId, subprocessPids.keys()) {
+ kill(containerId);
+ }
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr)
+{
+ VLOG(1) << "Starting to fetch URIs for container: " << containerId
+ << ", directory: " << directory;
+
+ Try<Subprocess> subprocess =
+ run(commandInfo, directory, user, flags, stdout, stderr);
+
+ if (subprocess.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ }
+
+ subprocessPids[containerId] = subprocess.get().pid();
+
+ return subprocess.get().status()
+ .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags)
+{
+ VLOG(1) << "Starting to fetch URIs for container: " << containerId
+ << ", directory: " << directory;
+
+ Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+
+ if (subprocess.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ }
+
+ subprocessPids[containerId] = subprocess.get().pid();
+
+ return subprocess.get().status()
+ .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::_fetch(
+ const ContainerID& containerId,
+ const Option<int>& status)
+{
+ subprocessPids.erase(containerId);
+
+ if (status.isNone()) {
+ return Failure("No status available from fetcher");
+ } else if (status.get() != 0) {
+ return Failure("Failed to fetch URIs for container '" +
+ stringify(containerId) + "'with exit status: " +
+ stringify(status.get()));
+ }
+
+ return Nothing();
+}
+
+
+Try<Subprocess> FetcherProcess::run(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -83,7 +229,7 @@ Try<Subprocess> run(
LOG(INFO) << "Fetching URIs using command '" << command << "'";
- Try<Subprocess> fetcher = subprocess(
+ Try<Subprocess> fetcherSubprocess = subprocess(
command,
Subprocess::PIPE(),
stdout.isSome()
@@ -92,17 +238,18 @@ Try<Subprocess> run(
stderr.isSome()
? Subprocess::FD(stderr.get())
: Subprocess::PIPE(),
- environment(commandInfo, directory, user, flags));
+ Fetcher::environment(commandInfo, directory, user, flags));
- if (fetcher.isError()) {
- return Error("Failed to execute mesos-fetcher: " + fetcher.error());
+ if (fetcherSubprocess.isError()) {
+ return Error(
+ "Failed to execute mesos-fetcher: " + fetcherSubprocess.error());
}
- return fetcher;
+ return fetcherSubprocess;
}
-Try<Subprocess> run(
+Try<Subprocess> FetcherProcess::run(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -147,7 +294,7 @@ Try<Subprocess> run(
}
}
- Try<Subprocess> fetcher = fetcher::run(
+ Try<Subprocess> subprocess = run(
commandInfo,
directory,
user,
@@ -155,31 +302,25 @@ Try<Subprocess> run(
out.get(),
err.get());
- fetcher.get().status()
+ subprocess.get().status()
.onAny(lambda::bind(&os::close, out.get()))
.onAny(lambda::bind(&os::close, err.get()));
- return fetcher;
+ return subprocess;
}
-Future<Nothing> _run(
- const ContainerID& containerId,
- const Option<int>& status)
+void FetcherProcess::kill(const ContainerID& containerId)
{
- if (status.isNone()) {
- return Failure("No status available from fetcher");
- } else if (status.get() != 0) {
- return Failure("Failed to fetch URIs for container '" +
- stringify(containerId) + "'with exit status: " +
- stringify(status.get()));
- }
+ if (subprocessPids.contains(containerId)) {
+ VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+ // Best effort kill the entire fetcher tree.
+ os::killtree(subprocessPids.get(containerId).get(), SIGKILL);
- return Nothing();
+ subprocessPids.erase(containerId);
+ }
}
-
-} // namespace fetcher {
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 12b81b1..8bbdf07 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -19,72 +19,134 @@
#ifndef __SLAVE_FETCHER_HPP__
#define __SLAVE_FETCHER_HPP__
-#include <map>
#include <string>
+#include <vector>
+
+#include <stout/hashmap.hpp>
#include <process/future.hpp>
-#include <process/io.hpp>
+#include <process/process.hpp>
#include <process/subprocess.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/result.hpp>
-#include <stout/strings.hpp>
-#include <stout/try.hpp>
-
#include <mesos/mesos.hpp>
-
-#include "slave/flags.hpp"
+#include <slave/flags.hpp>
namespace mesos {
namespace internal {
namespace slave {
-namespace fetcher {
-
-// Defines helpers for running the mesos-fetcher.
-// TODO(benh): Consider moving this into a 'fetcher' subdirectory as
-// well as the actual mesos-fetcher program (in launcher/fetcher.cpp
-// as of the writing of this comment).
-
-// Helper method to build the environment used to run mesos-fetcher.
-std::map<std::string, std::string> environment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
-
-// Run the mesos-fetcher for the specified arguments. Note that if
-// 'stdout' and 'stderr' file descriptors are provided then respective
-// output from the mesos-fetcher will be redirected to the file
-// descriptors. The file descriptors are duplicated (via dup) because
-// redirecting might still be occuring even after the mesos-fetcher has
-// terminated since there still might be data to be read.
-Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr);
-
-// Run the mesos-fetcher for the specified arguments, creating a
-// "stdout" and "stderr" file in the given directory and using
-// these for output.
-Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
-
-// Check status and return an error if any. Typically used after
-// calling run().
-process::Future<Nothing> _run(
- const ContainerID& containerId,
- const Option<int>& status);
-
-} // namespace fetcher {
+
+// Forward declaration.
+class FetcherProcess;
+
+// Argument passing to and invocation of the external fetcher program.
+// TODO(bernd-mesos) : Orchestration and synchronization of fetching
+// phases. Bookkeeping of executor files that are cached after
+// downloading from a URI by the fetcher program. Cache eviction.
+// There has to be exactly one fetcher with a distinct cache dir per
+// active slave. This means that the cache dir can only be fixed
+// after the slave ID has been determined by registration or recovery.
+class Fetcher
+{
+public:
+ Fetcher();
+
+ virtual ~Fetcher();
+
+ // Download the URIs specified in the command info and place the
+ // resulting files into the given work directory. Chmod said files
+ // to the user if given.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ // Same as above, but send stdout and stderr to the files 'stdout'
+ // and 'stderr' in the specified directory.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ // Best effort to kill the fetcher subprocess associated with the
+ // indicated container. Do nothing if no such subprocess exists.
+ void kill(const ContainerID& containerId);
+
+ // Build the environment passed to the mesos-fetcher program.
+ static std::map<std::string, std::string> environment(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+private:
+ process::Owned<FetcherProcess> process;
+};
+
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+ FetcherProcess() : ProcessBase("__fetcher__") {}
+
+ virtual ~FetcherProcess();
+
+ // Fetcher implementation.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ void kill(const ContainerID& containerId);
+
+private:
+ // Check status and return an error if any.
+ process::Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const Option<int>& status);
+
+ // Run the mesos-fetcher with custom output redirection. If
+ // 'stdout' and 'stderr' file descriptors are provided then respective
+ // output from the mesos-fetcher will be redirected to the file
+ // descriptors. The file descriptors are duplicated (via dup) because
+ // redirecting might still be occuring even after the mesos-fetcher has
+ // terminated since there still might be data to be read.
+ // This method is only "public" for test purposes.
+ Try<process::Subprocess> run(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ // Run the mesos-fetcher, creating a "stdout" and "stderr" file
+ // in the given directory and using these for output.
+ Try<process::Subprocess> run(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ hashmap<ContainerID, pid_t> subprocessPids;
+};
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d70259b..5c014eb 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -77,7 +77,8 @@ Future<Nothing> _nothing() { return Nothing(); }
Try<MesosContainerizer*> MesosContainerizer::create(
const Flags& flags,
- bool local)
+ bool local,
+ Fetcher* fetcher)
{
string isolation;
if (flags.isolation == "process") {
@@ -164,16 +165,22 @@ Try<MesosContainerizer*> MesosContainerizer::create(
}
return new MesosContainerizer(
- flags_, local, Owned<Launcher>(launcher.get()), isolators);
+ flags_, local, fetcher, Owned<Launcher>(launcher.get()), isolators);
}
MesosContainerizer::MesosContainerizer(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const Owned<Launcher>& launcher,
const vector<Owned<Isolator>>& isolators)
- : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
+ : process(new MesosContainerizerProcess(
+ flags,
+ local,
+ fetcher,
+ launcher,
+ isolators))
{
spawn(process.get());
}
@@ -540,29 +547,12 @@ Future<Nothing> MesosContainerizerProcess::fetch(
return Failure("Container is already destroyed");
}
- if (commandInfo.uris().size() == 0) {
- return Nothing();
- }
-
- Try<Subprocess> fetcher = fetcher::run(
+ return fetcher->fetch(
+ containerId,
commandInfo,
directory,
user,
flags);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- // TODO(tnachen): Currently the fetcher won't shutdown when slave
- // exits. This means the fetcher will still be running when slave
- // restarts and after recovering. We won't resume the task since
- // it hasn't checkpointed yet. Once the fetcher supports existing
- // on slave it will be removed automatically.
- containers_[containerId]->fetcher = fetcher.get();
-
- return fetcher.get().status()
- .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
}
@@ -899,10 +889,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& containerId)
return;
}
- if (container->state == FETCHING && container->fetcher.isSome()) {
- VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
- // Best effort kill the entire fetcher tree.
- os::killtree(container->fetcher.get().pid(), SIGKILL);
+ if (container->state == FETCHING) {
+ fetcher->kill(containerId);
}
if (container->state == ISOLATING) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 0b635d4..802988c 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -39,11 +39,15 @@ class MesosContainerizerProcess;
class MesosContainerizer : public Containerizer
{
public:
- static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+ static Try<MesosContainerizer*> create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher);
MesosContainerizer(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<Isolator>>& isolators);
@@ -100,10 +104,12 @@ public:
MesosContainerizerProcess(
const Flags& _flags,
bool _local,
+ Fetcher* _fetcher,
const process::Owned<Launcher>& _launcher,
const std::vector<process::Owned<Isolator>>& _isolators)
: flags(_flags),
local(_local),
+ fetcher(_fetcher),
launcher(_launcher),
isolators(_isolators) {}
@@ -214,6 +220,7 @@ private:
const Flags flags;
const bool local;
+ Fetcher* fetcher;
const process::Owned<Launcher> launcher;
const std::vector<process::Owned<Isolator>> isolators;
@@ -245,10 +252,6 @@ private:
// determine the cause of an executor termination.
std::vector<Limitation> limitations;
- // The mesos-fetcher subprocess, that we keep around so we can
- // stop the fetcher when the container is destroyed.
- Option<process::Subprocess> fetcher;
-
// We keep track of the resources for each container so we can set the
// ResourceStatistics limits in usage().
Resources resources;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 087944a..2ff2b0d 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -151,13 +151,18 @@ int main(int argc, char** argv)
LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
}
- Try<Containerizer*> containerizer = Containerizer::create(flags, false);
+ Fetcher fetcher;
+
+ Try<Containerizer*> containerizer =
+ Containerizer::create(flags, false, &fetcher);
+
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: "
<< containerizer.error();
}
Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
if (detector.isError()) {
EXIT(1) << "Failed to create a master detector: " << detector.error();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index fa5eeef..74cedb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -63,10 +63,12 @@
#include "slave/flags.hpp"
#include "slave/gc.hpp"
-#include "slave/containerizer/containerizer.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
@@ -185,6 +187,7 @@ public:
slave::Containerizer* containerizer;
bool createdContainerizer; // Whether we own the containerizer.
+ process::Owned<slave::Fetcher> fetcher;
process::Owned<slave::StatusUpdateManager> statusUpdateManager;
process::Owned<slave::GarbageCollector> gc;
process::Owned<MasterDetector> detector;
@@ -468,8 +471,11 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
if (containerizer.isSome()) {
slave.containerizer = containerizer.get();
} else {
+ // Create a new fetcher.
+ slave.fetcher.reset(new slave::Fetcher());
+
Try<slave::Containerizer*> containerizer =
- slave::Containerizer::create(flags, true);
+ slave::Containerizer::create(flags, true, slave.fetcher.get());
CHECK_SOME(containerizer);
slave.containerizer = containerizer.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 02a5f15..cfe31a6 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,6 +31,7 @@
#include "slave/flags.hpp"
+#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
@@ -61,6 +62,7 @@ public:
// Construct a MesosContainerizer with TestIsolator(s) which use the provided
// 'prepare' command(s).
Try<MesosContainerizer*> CreateContainerizer(
+ Fetcher* fetcher,
const vector<Option<CommandInfo> >& prepares)
{
vector<Owned<Isolator> > isolators;
@@ -85,18 +87,20 @@ public:
return new MesosContainerizer(
flags,
false,
+ fetcher,
Owned<Launcher>(launcher.get()),
isolators);
}
Try<MesosContainerizer*> CreateContainerizer(
+ Fetcher* fetcher,
const Option<CommandInfo>& prepare)
{
vector<Option<CommandInfo> > prepares;
prepares.push_back(prepare);
- return CreateContainerizer(prepares);
+ return CreateContainerizer(fetcher, prepares);
}
};
@@ -107,7 +111,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptSucceeds)
string directory = os::getcwd(); // We're inside a temporary sandbox.
string file = path::join(directory, "child.script.executed");
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer = CreateContainerizer(
+ &fetcher,
CREATE_COMMAND_INFO("touch " + file));
CHECK_SOME(containerizer);
@@ -151,7 +158,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptFails)
string directory = os::getcwd(); // We're inside a temporary sandbox.
string file = path::join(directory, "child.script.executed");
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer = CreateContainerizer(
+ &fetcher,
CREATE_COMMAND_INFO("touch " + file + " && exit 1"));
CHECK_SOME(containerizer);
@@ -205,7 +215,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, MultipleScripts)
// This will fail, either first or after the successful command.
prepares.push_back(CREATE_COMMAND_INFO("touch " + file2 + " && exit 1"));
- Try<MesosContainerizer*> containerizer = CreateContainerizer(prepares);
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> containerizer =
+ CreateContainerizer(&fetcher, prepares);
CHECK_SOME(containerizer);
ContainerID containerId;
@@ -251,9 +264,11 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
slave::Flags flags;
flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+ Fetcher fetcher;
+
// Use local=false so std{err,out} are redirected to files.
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(containerizer);
ContainerID containerId;
@@ -306,9 +321,10 @@ public:
MockMesosContainerizerProcess(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<Isolator>>& isolators)
- : MesosContainerizerProcess(flags, local, launcher, isolators)
+ : MesosContainerizerProcess(flags, local, fetcher, launcher, isolators)
{
// NOTE: See TestContainerizer::setup for why we use
// 'EXPECT_CALL' and 'WillRepeatedly' here instead of
@@ -343,9 +359,12 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
ASSERT_SOME(launcher);
std::vector<process::Owned<Isolator>> isolators;
+ Fetcher fetcher;
+
MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
flags,
true,
+ &fetcher,
Owned<Launcher>(launcher.get()),
isolators);
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index bed2d10..2105ae2 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -34,6 +34,8 @@
#include "tests/mesos.hpp"
#include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
@@ -49,9 +51,10 @@ using namespace process;
using mesos::internal::master::Master;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::DockerContainerizer;
using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
using process::Future;
using process::Message;
@@ -171,8 +174,9 @@ class MockDockerContainerizer : public DockerContainerizer {
public:
MockDockerContainerizer(
const slave::Flags& flags,
+ Fetcher* fetcher,
Shared<Docker> docker)
- : DockerContainerizer(flags, docker)
+ : DockerContainerizer(flags, fetcher, docker)
{
initialize();
}
@@ -285,8 +289,9 @@ class MockDockerContainerizerProcess : public DockerContainerizerProcess
public:
MockDockerContainerizerProcess(
const slave::Flags& flags,
+ Fetcher* fetcher,
const Shared<Docker>& docker)
- : DockerContainerizerProcess(flags, docker)
+ : DockerContainerizerProcess(flags, fetcher, docker)
{
EXPECT_CALL(*this, fetch(_))
.WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
@@ -350,7 +355,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -478,7 +485,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -602,7 +611,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -715,7 +726,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -832,7 +845,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -970,7 +985,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1126,7 +1143,9 @@ TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Recover)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
ContainerID containerId;
containerId.set_value("c1");
@@ -1249,7 +1268,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1375,7 +1396,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1502,7 +1525,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1634,7 +1659,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1767,10 +1794,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
+ Fetcher fetcher;
+
// We put the containerizer on the heap so we can more easily
// control it's lifetime, i.e., when we invoke the destructor.
MockDockerContainerizer* dockerContainerizer1 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
@@ -1854,7 +1883,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
@@ -1938,8 +1967,10 @@ TEST_F(DockerContainerizerTest,
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
+ Fetcher fetcher;
+
MockDockerContainerizer* dockerContainerizer1 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
@@ -2050,7 +2081,7 @@ TEST_F(DockerContainerizerTest,
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
@@ -2121,7 +2152,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -2256,7 +2289,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
.WillRepeatedly(FutureResult(
&logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -2356,10 +2391,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
+ Fetcher fetcher;
+
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
- new MockDockerContainerizerProcess(flags, docker);
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
@@ -2461,10 +2498,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
+ Fetcher fetcher;
+
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
- new MockDockerContainerizerProcess(flags, docker);
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 9e48392..f76182f 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -53,6 +53,8 @@ using namespace process;
using process::Subprocess;
using process::Future;
+using slave::Fetcher;
+
using std::string;
using std::map;
@@ -75,7 +77,7 @@ TEST_F(FetcherEnvironmentTest, Simple)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -106,7 +108,7 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -132,7 +134,7 @@ TEST_F(FetcherEnvironmentTest, NoUser)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, None(), flags);
+ Fetcher::environment(commandInfo, directory, None(), flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -158,7 +160,7 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
flags.hadoop_home = "";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -183,7 +185,7 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
flags.frameworks_home = "/tmp/frameworks";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -210,7 +212,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -238,7 +240,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -271,15 +273,15 @@ TEST_F(FetcherTest, FileURI)
uri->set_value("file://" + testFile);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -307,15 +309,15 @@ TEST_F(FetcherTest, FilePath)
uri->set_value(testFile);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -360,15 +362,15 @@ TEST_F(FetcherTest, OSNetUriTest)
uri->set_value(url);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -396,15 +398,15 @@ TEST_F(FetcherTest, FileLocalhostURI)
uri->set_value(path::join("file://localhost", testFile));
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -421,12 +423,18 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
ASSERT_SOME(path);
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get());
uri->set_executable(false);
uri->set_extract(false);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -436,18 +444,12 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
Try<string> basename = os::basename(path.get());
@@ -471,12 +473,18 @@ TEST_F(FetcherTest, NoExtractExecutable)
ASSERT_SOME(path);
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get());
uri->set_executable(true);
uri->set_extract(false);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -486,18 +494,12 @@ TEST_F(FetcherTest, NoExtractExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
Try<string> basename = os::basename(path.get());
@@ -529,12 +531,18 @@ TEST_F(FetcherTest, ExtractNotExecutable)
ASSERT_SOME(os::tar(path.get(), path.get() + ".tar.gz"));
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get() + ".tar.gz");
uri->set_executable(false);
uri->set_extract(true);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -544,18 +552,12 @@ TEST_F(FetcherTest, ExtractNotExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
ASSERT_TRUE(os::exists(path::join(".", path.get())));
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index ae1dcdf..a707398 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -25,6 +25,8 @@
#include "slave/slave.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
@@ -36,10 +38,11 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
@@ -145,8 +148,10 @@ TEST_F(HealthCheckTest, HealthyTask)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -204,8 +209,10 @@ TEST_F(HealthCheckTest, HealthyTaskNonShell)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -268,8 +275,10 @@ TEST_F(HealthCheckTest, HealthStatusChange)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -358,8 +367,10 @@ TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -443,9 +454,10 @@ TEST_F(HealthCheckTest, EnvironmentSetup)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
- Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ Fetcher fetcher;
+ Try<MesosContainerizer*> containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -506,8 +518,10 @@ TEST_F(HealthCheckTest, DISABLED_GracePeriod)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 01c0239..1f1c26d 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -58,6 +58,7 @@
#include "slave/containerizer/launcher.hpp"
#ifdef __linux__
+#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/linux_launcher.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
@@ -81,6 +82,7 @@ using mesos::internal::master::Master;
using mesos::internal::slave::CgroupsCpushareIsolatorProcess;
using mesos::internal::slave::CgroupsMemIsolatorProcess;
using mesos::internal::slave::CgroupsPerfEventIsolatorProcess;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::SharedFilesystemIsolatorProcess;
#endif // __linux__
using mesos::internal::slave::Isolator;
@@ -952,8 +954,10 @@ TEST_F(NamespacesPidIsolatorTest, ROOT_PidNamespace)
string directory = os::getcwd(); // We're inside a temporary sandbox.
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(containerizer);
ContainerID containerId;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 8bd0f14..cd4a398 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -50,6 +50,7 @@
#include "slave/state.hpp"
#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
#include "messages/messages.hpp"
@@ -66,8 +67,9 @@ using namespace process;
using mesos::internal::master::Master;
-using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
using std::map;
using std::string;
@@ -142,7 +144,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -322,7 +326,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -380,7 +386,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
.WillRepeatedly(Return()); // Ignore subsequent updates.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -407,7 +413,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -460,7 +468,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
.WillRepeatedly(Return()); // Ignore subsequent updates.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -501,7 +509,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -553,7 +563,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -612,7 +622,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -673,7 +685,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -738,7 +750,9 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
slave::Flags flags = this->CreateSlaveFlags();
flags.recovery_timeout = Milliseconds(1);
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -794,7 +808,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -831,7 +845,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -884,7 +900,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
_, &GarbageCollectorProcess::schedule);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -921,7 +937,9 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -981,7 +999,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
.Times(AtMost(1));
// Restart the slave in 'cleanup' recovery mode with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
flags.recover = "cleanup";
@@ -1020,7 +1038,9 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1125,7 +1145,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1214,7 +1236,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
Future<RegisterSlaveMessage> registerSlaveMessage =
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1266,7 +1290,9 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1318,7 +1344,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -1387,7 +1413,9 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
slave::Flags flags = this->CreateSlaveFlags();
flags.strict = false;
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1485,7 +1513,7 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -1523,7 +1551,9 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
slave::Flags flags = this->CreateSlaveFlags();
flags.strict = false;
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1608,7 +1638,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -1673,7 +1703,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1758,7 +1790,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -1792,7 +1824,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1892,7 +1926,9 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -2002,7 +2038,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2062,7 +2100,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
.WillOnce(FutureArg<1>(&status));
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -2103,7 +2141,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2172,7 +2212,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
FUTURE_DISPATCH(_, &Slave::executorTerminated);
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2209,7 +2249,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
EXPECT_CALL(allocator, addSlave(_, _, _, _));
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2313,7 +2355,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2373,7 +2415,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2453,7 +2497,7 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2535,7 +2579,9 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2638,7 +2684,7 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2665,7 +2711,9 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2740,7 +2788,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2806,7 +2854,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2906,7 +2956,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -3003,7 +3053,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
flags1.slave_subsystems = None();
#endif
- Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave1 = this->StartSlave(containerizer1.get(), flags1);
@@ -3040,7 +3092,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
flags2.slave_subsystems = None();
#endif
- Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true, &fetcher);
ASSERT_SOME(containerizer2);
Try<PID<Slave> > slave2 = this->StartSlave(containerizer2.get(), flags2);
@@ -3081,7 +3133,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart both slaves using the same flags with new containerizers.
- Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true);
+ Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true, &fetcher);
ASSERT_SOME(containerizer3);
Clock::pause();
@@ -3089,7 +3141,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
slave1 = this->StartSlave(containerizer3.get(), flags1);
ASSERT_SOME(slave1);
- Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true);
+ Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true, &fetcher);
ASSERT_SOME(containerizer4);
slave2 = this->StartSlave(containerizer4.get(), flags2);
@@ -3254,8 +3306,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
slave::Flags flags = this->CreateSlaveFlags();
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3305,7 +3359,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
// Restart the slave (use same flags) with a new containerizer.
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -3357,8 +3411,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
flags.isolation = "cgroups/cpu,cgroups/mem";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3432,7 +3488,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
flags.perf_interval = Milliseconds(500);
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -3508,8 +3564,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
flags.isolation = "cgroups/cpu,cgroups/mem";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3570,7 +3628,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -3613,8 +3671,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3676,7 +3736,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
flags.isolation = "cgroups/cpu,cgroups/mem";
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f2896a1..c50cbc7 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -49,6 +49,8 @@
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
@@ -61,11 +63,12 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
-using mesos::internal::slave::GarbageCollectorProcess;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
@@ -103,8 +106,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
// Set the isolation flag so we know a MesoContainerizer will be created.
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -406,8 +411,10 @@ TEST_F(SlaveTest, MesosExecutorCommandTaskWithArgsList)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -528,8 +535,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -622,8 +631,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());