You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/30 05:58:48 UTC
[1/2] mesos git commit: Consolidate all fetcher env vars into one
that holds a JSON object.
Repository: mesos
Updated Branches:
refs/heads/master 8254f8fa8 -> 699e638ef
Consolidate all fetcher env vars into one that holds a JSON object.
Changed env var ensemble into one env var containing JSON, parsing
this to protobuf for use. Adapted fetcher tests.
Review: https://reviews.apache.org/r/28975
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/699e638e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/699e638e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/699e638e
Branch: refs/heads/master
Commit: 699e638ef80739203ed38ba7238bbc176b107355
Parents: 2fd5659
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 15:42:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800
----------------------------------------------------------------------
include/mesos/fetcher/fetcher.hpp | 25 +++++
include/mesos/fetcher/fetcher.proto | 37 ++++++++
src/Makefile.am | 70 +++++++++-----
src/launcher/fetcher.cpp | 40 ++++----
src/slave/containerizer/fetcher.cpp | 19 ++--
src/slave/containerizer/fetcher.hpp | 24 +++--
src/tests/fetcher_tests.cpp | 156 +++++++++++++++++++++----------
7 files changed, 267 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/include/mesos/fetcher/fetcher.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.hpp b/include/mesos/fetcher/fetcher.hpp
new file mode 100644
index 0000000..b7e6a71
--- /dev/null
+++ b/include/mesos/fetcher/fetcher.hpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FETCHER_PROTO_HPP__
+#define __FETCHER_PROTO_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <mesos/fetcher/fetcher.pb.h>
+
+#endif // __FETCHER_PROTO_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/include/mesos/fetcher/fetcher.proto
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
new file mode 100644
index 0000000..facb87b
--- /dev/null
+++ b/include/mesos/fetcher/fetcher.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos/mesos.proto";
+
+package mesos.fetcher;
+
+option java_package = "org.apache.mesos.fetcher";
+option java_outer_classname = "Protos";
+
+
+/**
+ * Encodes the fetcher environment variable sent to the external fetcher
+ * program.
+ */
+message FetcherInfo {
+ required CommandInfo command_info = 1;
+ required string work_directory = 2;
+ optional string user = 3;
+ optional string frameworks_home = 4;
+ optional string hadoop_home = 5;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3f20213..0521f58 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -134,6 +134,9 @@ MESOS_PROTO = $(top_srcdir)/include/mesos/mesos.proto
CONTAINERIZER_PROTO = \
$(top_srcdir)/include/mesos/containerizer/containerizer.proto
+FETCHER_PROTO = \
+ $(top_srcdir)/include/mesos/fetcher/fetcher.proto
+
SCHEDULER_PROTO = \
$(top_srcdir)/include/mesos/scheduler/scheduler.proto
@@ -142,6 +145,8 @@ CXX_PROTOS = \
../include/mesos/mesos.pb.h \
containerizer/containerizer.pb.cc \
../include/mesos/containerizer/containerizer.pb.h \
+ fetcher/fetcher.pb.cc \
+ ../include/mesos/fetcher/fetcher.pb.h \
scheduler/scheduler.pb.cc \
../include/mesos/scheduler/scheduler.pb.h
@@ -190,6 +195,12 @@ containerizer/%.pb.cc ../include/mesos/containerizer/%.pb.h: $(CONTAINERIZER_PRO
$(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^
mv ../include/mesos/containerizer/*.pb.cc $(@D)
+fetcher/%.pb.cc ../include/mesos/fetcher/%.pb.h: $(FETCHER_PROTO)
+ $(MKDIR_P) $(@D)
+ $(MKDIR_P) ../include/mesos/fetcher
+ $(PROTOC) $(PROTOCFLAGS) --cpp_out=../include $^
+ mv ../include/mesos/fetcher/*.pb.cc $(@D)
+
scheduler/%.pb.cc ../include/mesos/scheduler/%.pb.h: $(SCHEDULER_PROTO)
$(MKDIR_P) $(@D)
$(MKDIR_P) ../include/mesos/scheduler
@@ -209,6 +220,10 @@ java/generated/org/apache/mesos/containerizer/Protos.java: \
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+java/generated/org/apache/mesos/fetcher/Protos.java: $(FETCHER_PROTO)
+ $(MKDIR_P) $(@D)
+ $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+
java/generated/org/apache/mesos/scheduler/Protos.java: $(SCHEDULER_PROTO)
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
@@ -332,6 +347,14 @@ containerizer_HEADERS = \
nodist_containerizer_HEADERS = ../include/mesos/containerizer/containerizer.pb.h
+fetcherdir = $(pkgincludedir)/fetcher
+
+fetcher_HEADERS = \
+ $(top_srcdir)/include/mesos/fetcher/fetcher.hpp \
+ $(top_srcdir)/include/mesos/fetcher/fetcher.proto
+
+nodist_fetcher_HEADERS = ../include/mesos/fetcher/fetcher.pb.h
+
schedulerdir = $(pkgincludedir)/scheduler
scheduler_HEADERS = \
@@ -597,7 +620,8 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la
lib_LTLIBRARIES += libmesos.la
# Include as part of the distribution.
-libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(SCHEDULER_PROTO)
+libmesos_la_SOURCES = \
+ $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(FETCHER_PROTO) $(SCHEDULER_PROTO)
libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION)
@@ -606,10 +630,10 @@ libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION)
libmesos_la_LIBTOOLFLAGS = --tag=CXX
# Add the convenience library.
-libmesos_la_LIBADD = \
- libmesos_no_3rdparty.la \
- -lsvn_subr-1 \
- -lsvn_delta-1 \
+libmesos_la_LIBADD = \
+ libmesos_no_3rdparty.la \
+ -lsvn_subr-1 \
+ -lsvn_delta-1 \
-lapr-1
libmesos_la_LIBADD += ../$(LIBPROCESS)/libprocess.la
@@ -801,8 +825,8 @@ nodist_sbin_SCRIPTS += deploy/mesos-daemon.sh \
deploy/mesos-start-slaves.sh deploy/mesos-stop-cluster.sh \
deploy/mesos-stop-masters.sh deploy/mesos-stop-slaves.sh
-pkgsysconf_DATA += deploy/mesos-deploy-env.sh.template \
- deploy/mesos-master-env.sh.template \
+pkgsysconf_DATA += deploy/mesos-deploy-env.sh.template \
+ deploy/mesos-master-env.sh.template \
deploy/mesos-slave-env.sh.template
# Need to explicitly add this because by default DATA files are not
@@ -979,21 +1003,21 @@ PHONY_TARGETS += clean-java
# Python files listed outside HAS_PYTHON so they are included with the
# distribution unconditionally.
-PYTHON_SOURCE = \
- python/src/mesos/__init__.py \
- python/interface/src/mesos/__init__.py \
- python/interface/src/mesos/interface/__init__.py \
- python/native/src/mesos/__init__.py \
- python/native/src/mesos/native/__init__.py \
- python/native/src/mesos/native/mesos_executor_driver_impl.cpp \
- python/native/src/mesos/native/mesos_executor_driver_impl.hpp \
- python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp \
- python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp \
- python/native/src/mesos/native/module.cpp \
- python/native/src/mesos/native/module.hpp \
- python/native/src/mesos/native/proxy_executor.cpp \
- python/native/src/mesos/native/proxy_executor.hpp \
- python/native/src/mesos/native/proxy_scheduler.cpp \
+PYTHON_SOURCE = \
+ python/src/mesos/__init__.py \
+ python/interface/src/mesos/__init__.py \
+ python/interface/src/mesos/interface/__init__.py \
+ python/native/src/mesos/__init__.py \
+ python/native/src/mesos/native/__init__.py \
+ python/native/src/mesos/native/mesos_executor_driver_impl.cpp \
+ python/native/src/mesos/native/mesos_executor_driver_impl.hpp \
+ python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp \
+ python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp \
+ python/native/src/mesos/native/module.cpp \
+ python/native/src/mesos/native/module.hpp \
+ python/native/src/mesos/native/proxy_executor.cpp \
+ python/native/src/mesos/native/proxy_executor.hpp \
+ python/native/src/mesos/native/proxy_scheduler.cpp \
python/native/src/mesos/native/proxy_scheduler.hpp
EXTRA_DIST += $(PYTHON_SOURCE)
@@ -1032,7 +1056,7 @@ endif
PHONY_TARGETS += $(PYTHON_SOURCE)
$(PYTHON_SOURCE):
- test "$(top_srcdir)" = "$(top_builddir)" || \
+ test "$(top_srcdir)" = "$(top_builddir)" || \
($(MKDIR_P) $(@D) && cp -pf $(srcdir)/$@ $@)
# We currently require both eggs and wheels to be built. Eggs can be added to
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 2f54d9e..fed0105 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -20,6 +20,8 @@
#include <mesos/mesos.hpp>
+#include <mesos/fetcher/fetcher.hpp>
+
#include <stout/json.hpp>
#include <stout/net.hpp>
#include <stout/option.hpp>
@@ -35,6 +37,8 @@
using namespace mesos;
using namespace mesos::internal;
+using mesos::fetcher::FetcherInfo;
+
using std::cerr;
using std::cout;
using std::endl;
@@ -264,35 +268,35 @@ int main(int argc, char* argv[])
logging::initialize(argv[0], flags, true); // Catch signals.
- CHECK(os::hasenv("MESOS_COMMAND_INFO"))
- << "Missing MESOS_COMMAND_INFO environment variable";
+ CHECK(os::hasenv("MESOS_FETCHER_INFO"))
+ << "Missing MESOS_FETCHER_INFO environment variable";
Try<JSON::Object> parse =
- JSON::parse<JSON::Object>(os::getenv("MESOS_COMMAND_INFO"));
+ JSON::parse<JSON::Object>(os::getenv("MESOS_FETCHER_INFO"));
if (parse.isError()) {
- EXIT(1) << "Failed to parse MESOS_COMMAND_INFO: " << parse.error();
+ EXIT(1) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
}
- Try<CommandInfo> commandInfo = protobuf::parse<CommandInfo>(parse.get());
-
- if (commandInfo.isError()) {
- EXIT(1) << "Failed to parse CommandInfo: " << commandInfo.error();
+ Try<FetcherInfo> fetcherInfo = protobuf::parse<FetcherInfo>(parse.get());
+ if (fetcherInfo.isError()) {
+ EXIT(1) << "Failed to parse FetcherInfo: " << fetcherInfo.error();
}
- CHECK(os::hasenv("MESOS_WORK_DIRECTORY"))
- << "Missing MESOS_WORK_DIRECTORY environment variable";
- std::string directory = os::getenv("MESOS_WORK_DIRECTORY");
+ const CommandInfo& commandInfo = fetcherInfo.get().command_info();
- // We cannot use Some in the ternary expression because the compiler needs to
- // be able to infer the type, thus the explicit Option<string>.
- // TODO(idownes): Add an os::hasenv that returns an Option<string>.
- Option<std::string> user = os::hasenv("MESOS_USER")
- ? Option<std::string>(os::getenv("MESOS_USER")) // Explicit so it compiles.
- : None();
+ const string& directory = fetcherInfo.get().work_directory();
+ if (directory.empty()) {
+ EXIT(1) << "Missing work directory";
+ }
+
+ Option<std::string> user = None();
+ if (fetcherInfo.get().has_user()) {
+ user = fetcherInfo.get().user();
+ }
// Fetch each URI to a local file, chmod, then chown if a user is provided.
- foreach (const CommandInfo::URI& uri, commandInfo.get().uris()) {
+ foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
// Fetch the URI to a local file.
Try<string> fetched = fetch(uri.value(), directory);
if (fetched.isError()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d04799f..5993670 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <mesos/fetcher/fetcher.hpp>
+
#include <process/dispatch.hpp>
#include <process/process.hpp>
@@ -29,6 +31,8 @@ using std::vector;
using process::Future;
+using mesos::fetcher::FetcherInfo;
+
namespace mesos {
namespace internal {
namespace slave {
@@ -53,24 +57,27 @@ map<string, string> Fetcher::environment(
const Option<string>& user,
const Flags& flags)
{
- map<string, string> result;
+ FetcherInfo fetcherInfo;
- result["MESOS_COMMAND_INFO"] = stringify(JSON::Protobuf(commandInfo));
+ fetcherInfo.mutable_command_info()->CopyFrom(commandInfo);
- result["MESOS_WORK_DIRECTORY"] = directory;
+ fetcherInfo.set_work_directory(directory);
if (user.isSome()) {
- result["MESOS_USER"] = user.get();
+ fetcherInfo.set_user(user.get());
}
if (!flags.frameworks_home.empty()) {
- result["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+ fetcherInfo.set_frameworks_home(flags.frameworks_home);
}
if (!flags.hadoop_home.empty()) {
- result["HADOOP_HOME"] = flags.hadoop_home;
+ fetcherInfo.set_hadoop_home(flags.hadoop_home);
}
+ map<string, string> result;
+ result["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(fetcherInfo));
+
return result;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 8bbdf07..1db0eaf 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -22,14 +22,15 @@
#include <string>
#include <vector>
-#include <stout/hashmap.hpp>
+#include <mesos/mesos.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
#include <process/subprocess.hpp>
-#include <mesos/mesos.hpp>
-#include <slave/flags.hpp>
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
namespace mesos {
namespace internal {
@@ -48,6 +49,16 @@ class FetcherProcess;
class Fetcher
{
public:
+ // Builds the environment used to run mesos-fetcher. This
+ // environment contains one variable with the name
+ // "MESOS_FETCHER_INFO", and its value is a protobuf of type
+ // mesos::fetcher::FetcherInfo.
+ static std::map<std::string, std::string> environment(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
Fetcher();
virtual ~Fetcher();
@@ -77,13 +88,6 @@ public:
// indicated container. Do nothing if no such subprocess exists.
void kill(const ContainerID& containerId);
- // Build the environment passed to the mesos-fetcher program.
- static std::map<std::string, std::string> environment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
-
private:
process::Owned<FetcherProcess> process;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/699e638e/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index f76182f..8c0b075 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -36,6 +36,8 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
+#include <mesos/fetcher/fetcher.hpp>
+
#include "slave/containerizer/fetcher.hpp"
#include "slave/flags.hpp"
@@ -58,6 +60,7 @@ using slave::Fetcher;
using std::string;
using std::map;
+using mesos::fetcher::FetcherInfo;
class FetcherEnvironmentTest : public ::testing::Test {};
@@ -79,13 +82,21 @@ TEST_F(FetcherEnvironmentTest, Simple)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(5u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
- EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -110,13 +121,21 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(5u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
- EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -136,12 +155,21 @@ TEST_F(FetcherEnvironmentTest, NoUser)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, None(), flags);
- EXPECT_EQ(4u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
- EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_FALSE(fetcherInfo.get().has_user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -162,12 +190,21 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(4u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -187,12 +224,21 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(4u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_FALSE(fetcherInfo.get().has_hadoop_home());
}
@@ -214,13 +260,21 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(5u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
- EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -242,13 +296,21 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
map<string, string> environment =
Fetcher::environment(commandInfo, directory, user, flags);
- EXPECT_EQ(5u, environment.size());
+ EXPECT_EQ(1u, environment.size());
+
+ Try<JSON::Object> parse =
+ JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
+ ASSERT_SOME(parse);
+
+ Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+ ASSERT_SOME(fetcherInfo);
+
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
- environment["MESOS_COMMAND_INFO"]);
- EXPECT_EQ(directory, environment["MESOS_WORK_DIRECTORY"]);
- EXPECT_EQ(user.get(), environment["MESOS_USER"]);
- EXPECT_EQ(flags.frameworks_home, environment["MESOS_FRAMEWORKS_HOME"]);
- EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+ stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
+ EXPECT_EQ(directory, fetcherInfo.get().work_directory());
+ EXPECT_EQ(user.get(), fetcherInfo.get().user());
+ EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+ EXPECT_EQ(flags.hadoop_home, fetcherInfo.get().hadoop_home());
}
@@ -272,13 +334,13 @@ TEST_F(FetcherTest, FileURI)
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value("file://" + testFile);
- map<string, string> env =
+ map<string, string> environment =
Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
- env);
+ environment);
ASSERT_SOME(fetcherSubprocess);
Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -308,13 +370,13 @@ TEST_F(FetcherTest, FilePath)
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(testFile);
- map<string, string> env =
+ map<string, string> environment =
Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
- env);
+ environment);
ASSERT_SOME(fetcherSubprocess);
Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -361,13 +423,13 @@ TEST_F(FetcherTest, OSNetUriTest)
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(url);
- map<string, string> env =
+ map<string, string> environment =
Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
- env);
+ environment);
ASSERT_SOME(fetcherSubprocess);
Future<Option<int>> status = fetcherSubprocess.get().status();
@@ -397,13 +459,13 @@ TEST_F(FetcherTest, FileLocalhostURI)
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path::join("file://localhost", testFile));
- map<string, string> env =
+ map<string, string> environment =
Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
- env);
+ environment);
ASSERT_SOME(fetcherSubprocess);
Future<Option<int>> status = fetcherSubprocess.get().status();
[2/2] mesos git commit: Refactor fetcher namespace into a
class/process.
Posted by be...@apache.org.
Refactor fetcher namespace into a class/process.
Layed the groundwork for having a fetcher object with a cache dir per
slave. Factored all fetcher-relevant code into a fetcher class and
process. Added a fetcher parameter to a lot of methods related to
launching tasks, mostly in containerizers. The latter are thus not
holders of fetchers, only slaves are. See MESOS-2172.
Review: https://reviews.apache.org/r/28830
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2fd56590
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2fd56590
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2fd56590
Branch: refs/heads/master
Commit: 2fd565901728309bb4f467f599f08f0b1e57eb69
Parents: 8254f8f
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Mon Dec 29 16:08:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Dec 29 20:11:17 2014 -0800
----------------------------------------------------------------------
src/local/local.cpp | 19 +-
src/slave/containerizer/containerizer.cpp | 11 +-
src/slave/containerizer/containerizer.hpp | 6 +-
src/slave/containerizer/docker.cpp | 40 ++--
src/slave/containerizer/docker.hpp | 14 +-
src/slave/containerizer/fetcher.cpp | 191 ++++++++++++++++---
src/slave/containerizer/fetcher.hpp | 174 +++++++++++------
src/slave/containerizer/mesos/containerizer.cpp | 40 ++--
src/slave/containerizer/mesos/containerizer.hpp | 13 +-
src/slave/main.cpp | 7 +-
src/tests/cluster.hpp | 10 +-
src/tests/containerizer_tests.cpp | 27 ++-
src/tests/docker_containerizer_tests.cpp | 83 +++++---
src/tests/fetcher_tests.cpp | 108 ++++++-----
src/tests/health_check_tests.cpp | 30 ++-
src/tests/isolator_tests.cpp | 6 +-
src/tests/slave_recovery_tests.cpp | 170 +++++++++++------
src/tests/slave_tests.cpp | 23 ++-
18 files changed, 668 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 89fed0b..76e73a4 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -51,11 +51,13 @@
#include "module/manager.hpp"
-#include "slave/containerizer/containerizer.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
@@ -74,6 +76,7 @@ using mesos::internal::master::Registrar;
using mesos::internal::master::Repairer;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::Slave;
using mesos::internal::slave::StatusUpdateManager;
@@ -108,6 +111,7 @@ static Option<Authorizer*> authorizer = None();
static Files* files = NULL;
static vector<GarbageCollector*>* garbageCollectors = NULL;
static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
+static vector<Fetcher*>* fetchers = NULL;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -212,6 +216,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
garbageCollectors = new vector<GarbageCollector*>();
statusUpdateManagers = new vector<StatusUpdateManager*>();
+ fetchers = new vector<Fetcher*>();
vector<UPID> pids;
@@ -226,8 +231,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
garbageCollectors->push_back(new GarbageCollector());
statusUpdateManagers->push_back(new StatusUpdateManager(flags));
+ fetchers->push_back(new Fetcher());
+
+ Try<Containerizer*> containerizer =
+ Containerizer::create(flags, true, fetchers->back());
- Try<Containerizer*> containerizer = Containerizer::create(flags, true);
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
}
@@ -307,6 +315,13 @@ void shutdown()
delete statusUpdateManagers;
statusUpdateManagers = NULL;
+ foreach (Fetcher* fetcher, *fetchers) {
+ delete fetcher;
+ }
+
+ delete fetchers;
+ fetchers = NULL;
+
delete registrar;
registrar = NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 1448bea..e89511a 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -159,7 +159,10 @@ Try<Resources> Containerizer::resources(const Flags& flags)
}
-Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
+Try<Containerizer*> Containerizer::create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher)
{
if (flags.isolation == "external") {
LOG(WARNING) << "The 'external' isolation flag is deprecated, "
@@ -167,7 +170,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
<< " '--containerizers=external'.";
Try<ExternalContainerizer*> containerizer =
- ExternalContainerizer::create(flags);
+ ExternalContainerizer::create(flags);
if (containerizer.isError()) {
return Error("Could not create ExternalContainerizer: " +
containerizer.error());
@@ -185,7 +188,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
foreach (const string& type, strings::split(flags.containerizers, ",")) {
if (type == "mesos") {
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, local);
+ MesosContainerizer::create(flags, local, fetcher);
if (containerizer.isError()) {
return Error("Could not create MesosContainerizer: " +
containerizer.error());
@@ -194,7 +197,7 @@ Try<Containerizer*> Containerizer::create(const Flags& flags, bool local)
}
} else if (type == "docker") {
Try<DockerContainerizer*> containerizer =
- DockerContainerizer::create(flags);
+ DockerContainerizer::create(flags, fetcher);
if (containerizer.isError()) {
return Error("Could not create DockerContainerizer: " +
containerizer.error());
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.hpp b/src/slave/containerizer/containerizer.hpp
index 02754cd..129e60f 100644
--- a/src/slave/containerizer/containerizer.hpp
+++ b/src/slave/containerizer/containerizer.hpp
@@ -35,6 +35,7 @@
#include <stout/option.hpp>
#include <stout/try.hpp>
+#include "slave/containerizer/fetcher.hpp"
namespace mesos {
namespace internal {
@@ -55,7 +56,10 @@ class Containerizer
{
public:
// Attempts to create a containerizer as specified by 'isolation' in flags.
- static Try<Containerizer*> create(const Flags& flags, bool local);
+ static Try<Containerizer*> create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher);
// Determine slave resources from flags, probing the system or querying a
// delegate.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 19a6ea2..5f4b4ce 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -103,14 +103,19 @@ Option<ContainerID> parse(const Docker::Container& container)
}
-Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
+Try<DockerContainerizer*> DockerContainerizer::create(
+ const Flags& flags,
+ Fetcher* fetcher)
{
Try<Docker*> docker = Docker::create(flags.docker);
if (docker.isError()) {
return Error(docker.error());
}
- return new DockerContainerizer(flags, Shared<Docker>(docker.get()));
+ return new DockerContainerizer(
+ flags,
+ fetcher,
+ Shared<Docker>(docker.get()));
}
@@ -124,8 +129,9 @@ DockerContainerizer::DockerContainerizer(
DockerContainerizer::DockerContainerizer(
const Flags& flags,
+ Fetcher* fetcher,
Shared<Docker> docker)
- : process(new DockerContainerizerProcess(flags, docker))
+ : process(new DockerContainerizerProcess(flags, fetcher, docker))
{
spawn(process.get());
}
@@ -221,29 +227,12 @@ Future<Nothing> DockerContainerizerProcess::fetch(
CHECK(containers_.contains(containerId));
Container* container = containers_[containerId];
- CommandInfo commandInfo = container->command();
-
- if (commandInfo.uris().size() == 0) {
- return Nothing();
- }
-
- VLOG(1) << "Starting to fetch URIs for container: " << containerId
- << ", directory: " << container->directory;
-
- Try<Subprocess> fetcher = fetcher::run(
- commandInfo,
+ return fetcher->fetch(
+ containerId,
+ container->command(),
container->directory,
None(),
flags);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- container->fetcher = fetcher.get();
-
- return fetcher.get().status()
- .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
}
@@ -1191,10 +1180,7 @@ void DockerContainerizerProcess::destroy(
LOG(INFO) << "Destroying Container '"
<< containerId << "' in FETCHING state";
- if (container->fetcher.isSome()) {
- // Best effort kill the entire fetcher tree.
- os::killtree(container->fetcher.get().pid(), SIGKILL);
- }
+ fetcher->kill(containerId);
containerizer::Termination termination;
termination.set_killed(killed);
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 28ebc62..b7bf54a 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -49,11 +49,14 @@ class DockerContainerizerProcess;
class DockerContainerizer : public Containerizer
{
public:
- static Try<DockerContainerizer*> create(const Flags& flags);
+ static Try<DockerContainerizer*> create(
+ const Flags& flags,
+ Fetcher* fetcher);
// This is only public for tests.
DockerContainerizer(
const Flags& flags,
+ Fetcher* fetcher,
process::Shared<Docker> docker);
// This is only public for tests.
@@ -110,8 +113,10 @@ class DockerContainerizerProcess
public:
DockerContainerizerProcess(
const Flags& _flags,
+ Fetcher* _fetcher,
process::Shared<Docker> _docker)
: flags(_flags),
+ fetcher(_fetcher),
docker(_docker) {}
virtual process::Future<Nothing> recover(
@@ -242,6 +247,8 @@ private:
const Flags flags;
+ Fetcher* fetcher;
+
process::Shared<Docker> docker;
struct Container
@@ -418,11 +425,6 @@ private:
// or ExecutorInfo::resources because they can change dynamically.
Resources resources;
- // The mesos-fetcher subprocess, kept around so that we can do a
- // killtree on it if we're asked to destroy a container while we
- // are fetching.
- Option<Subprocess> fetcher;
-
// The docker pull future is stored so we can discard when
// destroy is called while docker is pulling the image.
Future<Docker::Image> pull;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index d702a9c..d04799f 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,19 +16,38 @@
* limitations under the License.
*/
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
using std::map;
using std::string;
+using std::vector;
+
+using process::Future;
namespace mesos {
namespace internal {
namespace slave {
-namespace fetcher {
-map<string, string> environment(
+
+Fetcher::Fetcher() : process(new FetcherProcess())
+{
+ spawn(process.get());
+}
+
+
+Fetcher::~Fetcher()
+{
+ terminate(process.get());
+ process::wait(process.get());
+}
+
+
+map<string, string> Fetcher::environment(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -56,7 +75,134 @@ map<string, string> environment(
}
-Try<Subprocess> run(
+Future<Nothing> Fetcher::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr)
+{
+ if (commandInfo.uris().size() == 0) {
+ return Nothing();
+ }
+
+ return dispatch(process.get(),
+ &FetcherProcess::fetch,
+ containerId,
+ commandInfo,
+ directory,
+ user,
+ flags,
+ stdout,
+ stderr);
+}
+
+
+Future<Nothing> Fetcher::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags)
+{
+ if (commandInfo.uris().size() == 0) {
+ return Nothing();
+ }
+
+ return dispatch(process.get(),
+ &FetcherProcess::fetch,
+ containerId,
+ commandInfo,
+ directory,
+ user,
+ flags);
+}
+
+
+void Fetcher::kill(const ContainerID& containerId)
+{
+ dispatch(process.get(), &FetcherProcess::kill, containerId);
+}
+
+
+FetcherProcess::~FetcherProcess()
+{
+ foreach (const ContainerID& containerId, subprocessPids.keys()) {
+ kill(containerId);
+ }
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr)
+{
+ VLOG(1) << "Starting to fetch URIs for container: " << containerId
+ << ", directory: " << directory;
+
+ Try<Subprocess> subprocess =
+ run(commandInfo, directory, user, flags, stdout, stderr);
+
+ if (subprocess.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ }
+
+ subprocessPids[containerId] = subprocess.get().pid();
+
+ return subprocess.get().status()
+ .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user,
+ const Flags& flags)
+{
+ VLOG(1) << "Starting to fetch URIs for container: " << containerId
+ << ", directory: " << directory;
+
+ Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+
+ if (subprocess.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ }
+
+ subprocessPids[containerId] = subprocess.get().pid();
+
+ return subprocess.get().status()
+ .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+}
+
+
+Future<Nothing> FetcherProcess::_fetch(
+ const ContainerID& containerId,
+ const Option<int>& status)
+{
+ subprocessPids.erase(containerId);
+
+ if (status.isNone()) {
+ return Failure("No status available from fetcher");
+ } else if (status.get() != 0) {
+ return Failure("Failed to fetch URIs for container '" +
+ stringify(containerId) + "'with exit status: " +
+ stringify(status.get()));
+ }
+
+ return Nothing();
+}
+
+
+Try<Subprocess> FetcherProcess::run(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -83,7 +229,7 @@ Try<Subprocess> run(
LOG(INFO) << "Fetching URIs using command '" << command << "'";
- Try<Subprocess> fetcher = subprocess(
+ Try<Subprocess> fetcherSubprocess = subprocess(
command,
Subprocess::PIPE(),
stdout.isSome()
@@ -92,17 +238,18 @@ Try<Subprocess> run(
stderr.isSome()
? Subprocess::FD(stderr.get())
: Subprocess::PIPE(),
- environment(commandInfo, directory, user, flags));
+ Fetcher::environment(commandInfo, directory, user, flags));
- if (fetcher.isError()) {
- return Error("Failed to execute mesos-fetcher: " + fetcher.error());
+ if (fetcherSubprocess.isError()) {
+ return Error(
+ "Failed to execute mesos-fetcher: " + fetcherSubprocess.error());
}
- return fetcher;
+ return fetcherSubprocess;
}
-Try<Subprocess> run(
+Try<Subprocess> FetcherProcess::run(
const CommandInfo& commandInfo,
const string& directory,
const Option<string>& user,
@@ -147,7 +294,7 @@ Try<Subprocess> run(
}
}
- Try<Subprocess> fetcher = fetcher::run(
+ Try<Subprocess> subprocess = run(
commandInfo,
directory,
user,
@@ -155,31 +302,25 @@ Try<Subprocess> run(
out.get(),
err.get());
- fetcher.get().status()
+ subprocess.get().status()
.onAny(lambda::bind(&os::close, out.get()))
.onAny(lambda::bind(&os::close, err.get()));
- return fetcher;
+ return subprocess;
}
-Future<Nothing> _run(
- const ContainerID& containerId,
- const Option<int>& status)
+void FetcherProcess::kill(const ContainerID& containerId)
{
- if (status.isNone()) {
- return Failure("No status available from fetcher");
- } else if (status.get() != 0) {
- return Failure("Failed to fetch URIs for container '" +
- stringify(containerId) + "'with exit status: " +
- stringify(status.get()));
- }
+ if (subprocessPids.contains(containerId)) {
+ VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+ // Best effort kill the entire fetcher tree.
+ os::killtree(subprocessPids.get(containerId).get(), SIGKILL);
- return Nothing();
+ subprocessPids.erase(containerId);
+ }
}
-
-} // namespace fetcher {
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 12b81b1..8bbdf07 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -19,72 +19,134 @@
#ifndef __SLAVE_FETCHER_HPP__
#define __SLAVE_FETCHER_HPP__
-#include <map>
#include <string>
+#include <vector>
+
+#include <stout/hashmap.hpp>
#include <process/future.hpp>
-#include <process/io.hpp>
+#include <process/process.hpp>
#include <process/subprocess.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/result.hpp>
-#include <stout/strings.hpp>
-#include <stout/try.hpp>
-
#include <mesos/mesos.hpp>
-
-#include "slave/flags.hpp"
+#include <slave/flags.hpp>
namespace mesos {
namespace internal {
namespace slave {
-namespace fetcher {
-
-// Defines helpers for running the mesos-fetcher.
-// TODO(benh): Consider moving this into a 'fetcher' subdirectory as
-// well as the actual mesos-fetcher program (in launcher/fetcher.cpp
-// as of the writing of this comment).
-
-// Helper method to build the environment used to run mesos-fetcher.
-std::map<std::string, std::string> environment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
-
-// Run the mesos-fetcher for the specified arguments. Note that if
-// 'stdout' and 'stderr' file descriptors are provided then respective
-// output from the mesos-fetcher will be redirected to the file
-// descriptors. The file descriptors are duplicated (via dup) because
-// redirecting might still be occuring even after the mesos-fetcher has
-// terminated since there still might be data to be read.
-Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr);
-
-// Run the mesos-fetcher for the specified arguments, creating a
-// "stdout" and "stderr" file in the given directory and using
-// these for output.
-Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
-
-// Check status and return an error if any. Typically used after
-// calling run().
-process::Future<Nothing> _run(
- const ContainerID& containerId,
- const Option<int>& status);
-
-} // namespace fetcher {
+
+// Forward declaration.
+class FetcherProcess;
+
+// Argument passing to and invocation of the external fetcher program.
+// TODO(bernd-mesos) : Orchestration and synchronization of fetching
+// phases. Bookkeeping of executor files that are cached after
+// downloading from a URI by the fetcher program. Cache eviction.
+// There has to be exactly one fetcher with a distinct cache dir per
+// active slave. This means that the cache dir can only be fixed
+// after the slave ID has been determined by registration or recovery.
+class Fetcher
+{
+public:
+ Fetcher();
+
+ virtual ~Fetcher();
+
+ // Download the URIs specified in the command info and place the
+ // resulting files into the given work directory. Chmod said files
+ // to the user if given.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ // Same as above, but send stdout and stderr to the files 'stdout'
+ // and 'stderr' in the specified directory.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ // Best effort to kill the fetcher subprocess associated with the
+ // indicated container. Do nothing if no such subprocess exists.
+ void kill(const ContainerID& containerId);
+
+ // Build the environment passed to the mesos-fetcher program.
+ static std::map<std::string, std::string> environment(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+private:
+ process::Owned<FetcherProcess> process;
+};
+
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+ FetcherProcess() : ProcessBase("__fetcher__") {}
+
+ virtual ~FetcherProcess();
+
+ // Fetcher implementation.
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ void kill(const ContainerID& containerId);
+
+private:
+ // Check status and return an error if any.
+ process::Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const Option<int>& status);
+
+ // Run the mesos-fetcher with custom output redirection. If
+ // 'stdout' and 'stderr' file descriptors are provided then respective
+ // output from the mesos-fetcher will be redirected to the file
+ // descriptors. The file descriptors are duplicated (via dup) because
+ // redirecting might still be occuring even after the mesos-fetcher has
+ // terminated since there still might be data to be read.
+ // This method is only "public" for test purposes.
+ Try<process::Subprocess> run(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags,
+ const Option<int>& stdout,
+ const Option<int>& stderr);
+
+ // Run the mesos-fetcher, creating a "stdout" and "stderr" file
+ // in the given directory and using these for output.
+ Try<process::Subprocess> run(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags);
+
+ hashmap<ContainerID, pid_t> subprocessPids;
+};
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index d70259b..5c014eb 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -77,7 +77,8 @@ Future<Nothing> _nothing() { return Nothing(); }
Try<MesosContainerizer*> MesosContainerizer::create(
const Flags& flags,
- bool local)
+ bool local,
+ Fetcher* fetcher)
{
string isolation;
if (flags.isolation == "process") {
@@ -164,16 +165,22 @@ Try<MesosContainerizer*> MesosContainerizer::create(
}
return new MesosContainerizer(
- flags_, local, Owned<Launcher>(launcher.get()), isolators);
+ flags_, local, fetcher, Owned<Launcher>(launcher.get()), isolators);
}
MesosContainerizer::MesosContainerizer(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const Owned<Launcher>& launcher,
const vector<Owned<Isolator>>& isolators)
- : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
+ : process(new MesosContainerizerProcess(
+ flags,
+ local,
+ fetcher,
+ launcher,
+ isolators))
{
spawn(process.get());
}
@@ -540,29 +547,12 @@ Future<Nothing> MesosContainerizerProcess::fetch(
return Failure("Container is already destroyed");
}
- if (commandInfo.uris().size() == 0) {
- return Nothing();
- }
-
- Try<Subprocess> fetcher = fetcher::run(
+ return fetcher->fetch(
+ containerId,
commandInfo,
directory,
user,
flags);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- // TODO(tnachen): Currently the fetcher won't shutdown when slave
- // exits. This means the fetcher will still be running when slave
- // restarts and after recovering. We won't resume the task since
- // it hasn't checkpointed yet. Once the fetcher supports existing
- // on slave it will be removed automatically.
- containers_[containerId]->fetcher = fetcher.get();
-
- return fetcher.get().status()
- .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
}
@@ -899,10 +889,8 @@ void MesosContainerizerProcess::destroy(const ContainerID& containerId)
return;
}
- if (container->state == FETCHING && container->fetcher.isSome()) {
- VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
- // Best effort kill the entire fetcher tree.
- os::killtree(container->fetcher.get().pid(), SIGKILL);
+ if (container->state == FETCHING) {
+ fetcher->kill(containerId);
}
if (container->state == ISOLATING) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 0b635d4..802988c 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -39,11 +39,15 @@ class MesosContainerizerProcess;
class MesosContainerizer : public Containerizer
{
public:
- static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+ static Try<MesosContainerizer*> create(
+ const Flags& flags,
+ bool local,
+ Fetcher* fetcher);
MesosContainerizer(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<Isolator>>& isolators);
@@ -100,10 +104,12 @@ public:
MesosContainerizerProcess(
const Flags& _flags,
bool _local,
+ Fetcher* _fetcher,
const process::Owned<Launcher>& _launcher,
const std::vector<process::Owned<Isolator>>& _isolators)
: flags(_flags),
local(_local),
+ fetcher(_fetcher),
launcher(_launcher),
isolators(_isolators) {}
@@ -214,6 +220,7 @@ private:
const Flags flags;
const bool local;
+ Fetcher* fetcher;
const process::Owned<Launcher> launcher;
const std::vector<process::Owned<Isolator>> isolators;
@@ -245,10 +252,6 @@ private:
// determine the cause of an executor termination.
std::vector<Limitation> limitations;
- // The mesos-fetcher subprocess, that we keep around so we can
- // stop the fetcher when the container is destroyed.
- Option<process::Subprocess> fetcher;
-
// We keep track of the resources for each container so we can set the
// ResourceStatistics limits in usage().
Resources resources;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 087944a..2ff2b0d 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -151,13 +151,18 @@ int main(int argc, char** argv)
LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
}
- Try<Containerizer*> containerizer = Containerizer::create(flags, false);
+ Fetcher fetcher;
+
+ Try<Containerizer*> containerizer =
+ Containerizer::create(flags, false, &fetcher);
+
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: "
<< containerizer.error();
}
Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
if (detector.isError()) {
EXIT(1) << "Failed to create a master detector: " << detector.error();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index fa5eeef..74cedb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -63,10 +63,12 @@
#include "slave/flags.hpp"
#include "slave/gc.hpp"
-#include "slave/containerizer/containerizer.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
@@ -185,6 +187,7 @@ public:
slave::Containerizer* containerizer;
bool createdContainerizer; // Whether we own the containerizer.
+ process::Owned<slave::Fetcher> fetcher;
process::Owned<slave::StatusUpdateManager> statusUpdateManager;
process::Owned<slave::GarbageCollector> gc;
process::Owned<MasterDetector> detector;
@@ -468,8 +471,11 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
if (containerizer.isSome()) {
slave.containerizer = containerizer.get();
} else {
+ // Create a new fetcher.
+ slave.fetcher.reset(new slave::Fetcher());
+
Try<slave::Containerizer*> containerizer =
- slave::Containerizer::create(flags, true);
+ slave::Containerizer::create(flags, true, slave.fetcher.get());
CHECK_SOME(containerizer);
slave.containerizer = containerizer.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 02a5f15..cfe31a6 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,6 +31,7 @@
#include "slave/flags.hpp"
+#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
@@ -61,6 +62,7 @@ public:
// Construct a MesosContainerizer with TestIsolator(s) which use the provided
// 'prepare' command(s).
Try<MesosContainerizer*> CreateContainerizer(
+ Fetcher* fetcher,
const vector<Option<CommandInfo> >& prepares)
{
vector<Owned<Isolator> > isolators;
@@ -85,18 +87,20 @@ public:
return new MesosContainerizer(
flags,
false,
+ fetcher,
Owned<Launcher>(launcher.get()),
isolators);
}
Try<MesosContainerizer*> CreateContainerizer(
+ Fetcher* fetcher,
const Option<CommandInfo>& prepare)
{
vector<Option<CommandInfo> > prepares;
prepares.push_back(prepare);
- return CreateContainerizer(prepares);
+ return CreateContainerizer(fetcher, prepares);
}
};
@@ -107,7 +111,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptSucceeds)
string directory = os::getcwd(); // We're inside a temporary sandbox.
string file = path::join(directory, "child.script.executed");
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer = CreateContainerizer(
+ &fetcher,
CREATE_COMMAND_INFO("touch " + file));
CHECK_SOME(containerizer);
@@ -151,7 +158,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, ScriptFails)
string directory = os::getcwd(); // We're inside a temporary sandbox.
string file = path::join(directory, "child.script.executed");
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer = CreateContainerizer(
+ &fetcher,
CREATE_COMMAND_INFO("touch " + file + " && exit 1"));
CHECK_SOME(containerizer);
@@ -205,7 +215,10 @@ TEST_F(MesosContainerizerIsolatorPreparationTest, MultipleScripts)
// This will fail, either first or after the successful command.
prepares.push_back(CREATE_COMMAND_INFO("touch " + file2 + " && exit 1"));
- Try<MesosContainerizer*> containerizer = CreateContainerizer(prepares);
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> containerizer =
+ CreateContainerizer(&fetcher, prepares);
CHECK_SOME(containerizer);
ContainerID containerId;
@@ -251,9 +264,11 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
slave::Flags flags;
flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+ Fetcher fetcher;
+
// Use local=false so std{err,out} are redirected to files.
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(containerizer);
ContainerID containerId;
@@ -306,9 +321,10 @@ public:
MockMesosContainerizerProcess(
const Flags& flags,
bool local,
+ Fetcher* fetcher,
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<Isolator>>& isolators)
- : MesosContainerizerProcess(flags, local, launcher, isolators)
+ : MesosContainerizerProcess(flags, local, fetcher, launcher, isolators)
{
// NOTE: See TestContainerizer::setup for why we use
// 'EXPECT_CALL' and 'WillRepeatedly' here instead of
@@ -343,9 +359,12 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
ASSERT_SOME(launcher);
std::vector<process::Owned<Isolator>> isolators;
+ Fetcher fetcher;
+
MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
flags,
true,
+ &fetcher,
Owned<Launcher>(launcher.get()),
isolators);
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index bed2d10..2105ae2 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -34,6 +34,8 @@
#include "tests/mesos.hpp"
#include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
@@ -49,9 +51,10 @@ using namespace process;
using mesos::internal::master::Master;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::DockerContainerizer;
using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
using process::Future;
using process::Message;
@@ -171,8 +174,9 @@ class MockDockerContainerizer : public DockerContainerizer {
public:
MockDockerContainerizer(
const slave::Flags& flags,
+ Fetcher* fetcher,
Shared<Docker> docker)
- : DockerContainerizer(flags, docker)
+ : DockerContainerizer(flags, fetcher, docker)
{
initialize();
}
@@ -285,8 +289,9 @@ class MockDockerContainerizerProcess : public DockerContainerizerProcess
public:
MockDockerContainerizerProcess(
const slave::Flags& flags,
+ Fetcher* fetcher,
const Shared<Docker>& docker)
- : DockerContainerizerProcess(flags, docker)
+ : DockerContainerizerProcess(flags, fetcher, docker)
{
EXPECT_CALL(*this, fetch(_))
.WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
@@ -350,7 +355,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -478,7 +485,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -602,7 +611,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -715,7 +726,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
slave::Flags flags = CreateSlaveFlags();
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -832,7 +845,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -970,7 +985,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1126,7 +1143,9 @@ TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Recover)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
ContainerID containerId;
containerId.set_value("c1");
@@ -1249,7 +1268,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1375,7 +1396,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1502,7 +1525,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1634,7 +1659,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -1767,10 +1794,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
+ Fetcher fetcher;
+
// We put the containerizer on the heap so we can more easily
// control it's lifetime, i.e., when we invoke the destructor.
MockDockerContainerizer* dockerContainerizer1 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
@@ -1854,7 +1883,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
@@ -1938,8 +1967,10 @@ TEST_F(DockerContainerizerTest,
Invoke((MockDocker*) docker.get(),
&MockDocker::_logs)));
+ Fetcher fetcher;
+
MockDockerContainerizer* dockerContainerizer1 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
@@ -2050,7 +2081,7 @@ TEST_F(DockerContainerizerTest,
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
- new MockDockerContainerizer(flags, docker);
+ new MockDockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
@@ -2121,7 +2152,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -2256,7 +2289,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
.WillRepeatedly(FutureResult(
&logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs)));
- MockDockerContainerizer dockerContainerizer(flags, docker);
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
@@ -2356,10 +2391,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
+ Fetcher fetcher;
+
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
- new MockDockerContainerizerProcess(flags, docker);
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
@@ -2461,10 +2498,12 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
MockDocker* mockDocker = new MockDocker(tests::flags.docker);
Shared<Docker> docker(mockDocker);
+ Fetcher fetcher;
+
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
- new MockDockerContainerizerProcess(flags, docker);
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 9e48392..f76182f 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -53,6 +53,8 @@ using namespace process;
using process::Subprocess;
using process::Future;
+using slave::Fetcher;
+
using std::string;
using std::map;
@@ -75,7 +77,7 @@ TEST_F(FetcherEnvironmentTest, Simple)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -106,7 +108,7 @@ TEST_F(FetcherEnvironmentTest, MultipleURIs)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -132,7 +134,7 @@ TEST_F(FetcherEnvironmentTest, NoUser)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, None(), flags);
+ Fetcher::environment(commandInfo, directory, None(), flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -158,7 +160,7 @@ TEST_F(FetcherEnvironmentTest, EmptyHadoop)
flags.hadoop_home = "";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -183,7 +185,7 @@ TEST_F(FetcherEnvironmentTest, NoHadoop)
flags.frameworks_home = "/tmp/frameworks";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(4u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -210,7 +212,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -238,7 +240,7 @@ TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
flags.hadoop_home = "/tmp/hadoop";
map<string, string> environment =
- fetcher::environment(commandInfo, directory, user, flags);
+ Fetcher::environment(commandInfo, directory, user, flags);
EXPECT_EQ(5u, environment.size());
EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
@@ -271,15 +273,15 @@ TEST_F(FetcherTest, FileURI)
uri->set_value("file://" + testFile);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -307,15 +309,15 @@ TEST_F(FetcherTest, FilePath)
uri->set_value(testFile);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -360,15 +362,15 @@ TEST_F(FetcherTest, OSNetUriTest)
uri->set_value(url);
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -396,15 +398,15 @@ TEST_F(FetcherTest, FileLocalhostURI)
uri->set_value(path::join("file://localhost", testFile));
map<string, string> env =
- fetcher::environment(commandInfo, os::getcwd(), None(), flags);
+ Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
- Try<Subprocess> fetcherProcess =
+ Try<Subprocess> fetcherSubprocess =
process::subprocess(
path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
env);
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ ASSERT_SOME(fetcherSubprocess);
+ Future<Option<int>> status = fetcherSubprocess.get().status();
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -421,12 +423,18 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
ASSERT_SOME(path);
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get());
uri->set_executable(false);
uri->set_extract(false);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -436,18 +444,12 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
Try<string> basename = os::basename(path.get());
@@ -471,12 +473,18 @@ TEST_F(FetcherTest, NoExtractExecutable)
ASSERT_SOME(path);
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get());
uri->set_executable(true);
uri->set_extract(false);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -486,18 +494,12 @@ TEST_F(FetcherTest, NoExtractExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
Try<string> basename = os::basename(path.get());
@@ -529,12 +531,18 @@ TEST_F(FetcherTest, ExtractNotExecutable)
ASSERT_SOME(os::tar(path.get(), path.get() + ".tar.gz"));
+ ContainerID containerId;
+ containerId.set_value(UUID::random().toString());
+
CommandInfo commandInfo;
CommandInfo::URI* uri = commandInfo.add_uris();
uri->set_value(path.get() + ".tar.gz");
uri->set_executable(false);
uri->set_extract(true);
+ slave::Flags flags;
+ flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
Option<int> stdout = None();
Option<int> stderr = None();
@@ -544,18 +552,12 @@ TEST_F(FetcherTest, ExtractNotExecutable)
stderr = STDERR_FILENO;
}
- slave::Flags flags;
- flags.launcher_dir = path::join(tests::flags.build_dir, "src");
-
- Try<Subprocess> fetcherProcess =
- fetcher::run(commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+ Fetcher fetcher;
- ASSERT_SOME(fetcherProcess);
- Future<Option<int>> status = fetcherProcess.get().status();
+ Future<Nothing> fetch = fetcher.fetch(
+ containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
- AWAIT_READY(status);
- ASSERT_SOME(status.get());
- EXPECT_EQ(0, status.get().get());
+ AWAIT_READY(fetch);
ASSERT_TRUE(os::exists(path::join(".", path.get())));
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index ae1dcdf..a707398 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -25,6 +25,8 @@
#include "slave/slave.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
@@ -36,10 +38,11 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
@@ -145,8 +148,10 @@ TEST_F(HealthCheckTest, HealthyTask)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -204,8 +209,10 @@ TEST_F(HealthCheckTest, HealthyTaskNonShell)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -268,8 +275,10 @@ TEST_F(HealthCheckTest, HealthStatusChange)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -358,8 +367,10 @@ TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -443,9 +454,10 @@ TEST_F(HealthCheckTest, EnvironmentSetup)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
- Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ Fetcher fetcher;
+ Try<MesosContainerizer*> containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -506,8 +518,10 @@ TEST_F(HealthCheckTest, DISABLED_GracePeriod)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 01c0239..1f1c26d 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -58,6 +58,7 @@
#include "slave/containerizer/launcher.hpp"
#ifdef __linux__
+#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/linux_launcher.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
@@ -81,6 +82,7 @@ using mesos::internal::master::Master;
using mesos::internal::slave::CgroupsCpushareIsolatorProcess;
using mesos::internal::slave::CgroupsMemIsolatorProcess;
using mesos::internal::slave::CgroupsPerfEventIsolatorProcess;
+using mesos::internal::slave::Fetcher;
using mesos::internal::slave::SharedFilesystemIsolatorProcess;
#endif // __linux__
using mesos::internal::slave::Isolator;
@@ -952,8 +954,10 @@ TEST_F(NamespacesPidIsolatorTest, ROOT_PidNamespace)
string directory = os::getcwd(); // We're inside a temporary sandbox.
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(containerizer);
ContainerID containerId;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 8bd0f14..cd4a398 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -50,6 +50,7 @@
#include "slave/state.hpp"
#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/fetcher.hpp"
#include "messages/messages.hpp"
@@ -66,8 +67,9 @@ using namespace process;
using mesos::internal::master::Master;
-using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
using std::map;
using std::string;
@@ -142,7 +144,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -322,7 +326,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -380,7 +386,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
.WillRepeatedly(Return()); // Ignore subsequent updates.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -407,7 +413,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -460,7 +468,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
.WillRepeatedly(Return()); // Ignore subsequent updates.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -501,7 +509,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -553,7 +563,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -612,7 +622,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -673,7 +685,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -738,7 +750,9 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
slave::Flags flags = this->CreateSlaveFlags();
flags.recovery_timeout = Milliseconds(1);
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -794,7 +808,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout)
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -831,7 +845,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -884,7 +900,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
_, &GarbageCollectorProcess::schedule);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -921,7 +937,9 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -981,7 +999,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
.Times(AtMost(1));
// Restart the slave in 'cleanup' recovery mode with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
flags.recover = "cleanup";
@@ -1020,7 +1038,9 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1125,7 +1145,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1214,7 +1236,9 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
Future<RegisterSlaveMessage> registerSlaveMessage =
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1266,7 +1290,9 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1318,7 +1344,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -1387,7 +1413,9 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
slave::Flags flags = this->CreateSlaveFlags();
flags.strict = false;
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1485,7 +1513,7 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -1523,7 +1551,9 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
slave::Flags flags = this->CreateSlaveFlags();
flags.strict = false;
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1608,7 +1638,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -1673,7 +1703,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -1758,7 +1790,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -1792,7 +1824,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -1892,7 +1926,9 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer);
Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
@@ -2002,7 +2038,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2062,7 +2100,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
.WillOnce(FutureArg<1>(&status));
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -2103,7 +2141,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2172,7 +2212,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
FUTURE_DISPATCH(_, &Slave::executorTerminated);
// Now restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2209,7 +2249,9 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
EXPECT_CALL(allocator, addSlave(_, _, _, _));
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2313,7 +2355,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2373,7 +2415,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2453,7 +2497,7 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2535,7 +2579,9 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2638,7 +2684,7 @@ TYPED_TEST(SlaveRecoveryTest, PartitionedSlave)
FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2665,7 +2711,9 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2740,7 +2788,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new isolator.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -2806,7 +2854,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
slave::Flags flags = this->CreateSlaveFlags();
- Try<TypeParam*> containerizer1 = TypeParam::create(flags, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -2906,7 +2956,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart the slave (use same flags) with a new containerizer.
- Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -3003,7 +3053,9 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
flags1.slave_subsystems = None();
#endif
- Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true);
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave1 = this->StartSlave(containerizer1.get(), flags1);
@@ -3040,7 +3092,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
flags2.slave_subsystems = None();
#endif
- Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true);
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true, &fetcher);
ASSERT_SOME(containerizer2);
Try<PID<Slave> > slave2 = this->StartSlave(containerizer2.get(), flags2);
@@ -3081,7 +3133,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Restart both slaves using the same flags with new containerizers.
- Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true);
+ Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true, &fetcher);
ASSERT_SOME(containerizer3);
Clock::pause();
@@ -3089,7 +3141,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
slave1 = this->StartSlave(containerizer3.get(), flags1);
ASSERT_SOME(slave1);
- Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true);
+ Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true, &fetcher);
ASSERT_SOME(containerizer4);
slave2 = this->StartSlave(containerizer4.get(), flags2);
@@ -3254,8 +3306,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
slave::Flags flags = this->CreateSlaveFlags();
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3305,7 +3359,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics)
// Restart the slave (use same flags) with a new containerizer.
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
slave = this->StartSlave(containerizer2.get(), flags);
@@ -3357,8 +3411,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
flags.isolation = "cgroups/cpu,cgroups/mem";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3432,7 +3488,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward)
flags.perf_interval = Milliseconds(500);
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -3508,8 +3564,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
flags.isolation = "cgroups/cpu,cgroups/mem";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3570,7 +3628,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward)
flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
@@ -3613,8 +3671,10 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
flags.isolation = "cgroups/cpu,cgroups/mem,namespaces/pid";
flags.slave_subsystems = "";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer1 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer1);
Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
@@ -3676,7 +3736,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward)
flags.isolation = "cgroups/cpu,cgroups/mem";
Try<MesosContainerizer*> containerizer2 =
- MesosContainerizer::create(flags, true);
+ MesosContainerizer::create(flags, true, &fetcher);
ASSERT_SOME(containerizer2);
Future<vector<Offer> > offers2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2fd56590/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f2896a1..c50cbc7 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -49,6 +49,8 @@
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
@@ -61,11 +63,12 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
-using mesos::internal::slave::GarbageCollectorProcess;
-using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
@@ -103,8 +106,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
// Set the isolation flag so we know a MesoContainerizer will be created.
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -406,8 +411,10 @@ TEST_F(SlaveTest, MesosExecutorCommandTaskWithArgsList)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -528,8 +535,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
@@ -622,8 +631,10 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithUser)
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
+ Fetcher fetcher;
+
Try<MesosContainerizer*> containerizer =
- MesosContainerizer::create(flags, false);
+ MesosContainerizer::create(flags, false, &fetcher);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());