You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:46:00 UTC
[2/4] mesos git commit: Added a cache to the Fetcher.
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index 9e9e9d0..c519bff 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,27 +16,40 @@
* limitations under the License.
*/
-#include <mesos/fetcher/fetcher.hpp>
+#include <unordered_map>
+#include <process/async.hpp>
+#include <process/check.hpp>
+#include <process/collect.hpp>
#include <process/dispatch.hpp>
-#include <process/process.hpp>
+
+#include <stout/net.hpp>
+#include <stout/path.hpp>
+
+#include "hdfs/hdfs.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
+using std::list;
using std::map;
+using std::shared_ptr;
using std::string;
+using std::transform;
using std::vector;
using process::Future;
-using mesos::fetcher::FetcherInfo;
-
namespace mesos {
namespace internal {
namespace slave {
+static const string FILE_URI_PREFIX = "file://";
+static const string FILE_URI_LOCALHOST = "file://localhost";
+
+static const string CACHE_FILE_NAME_PREFIX = "c";
+
Fetcher::Fetcher() : process(new FetcherProcess())
{
@@ -44,6 +57,13 @@ Fetcher::Fetcher() : process(new FetcherProcess())
}
+Fetcher::Fetcher(const process::Owned<FetcherProcess>& process)
+ : process(process)
+{
+ spawn(process.get());
+}
+
+
Fetcher::~Fetcher()
{
terminate(process.get());
@@ -51,68 +71,144 @@ Fetcher::~Fetcher()
}
-map<string, string> Fetcher::environment(
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user,
- const Flags& flags)
+Try<Nothing> Fetcher::recover(const SlaveID& slaveId, const Flags& flags)
{
- FetcherInfo fetcherInfo;
+ // Good enough for now, simple, least-effort recovery.
+ VLOG(1) << "Clearing fetcher cache";
+
+ string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+ Result<string> path = os::realpath(cacheDirectory);
+ if (path.isError()) {
+ LOG(ERROR) << "Malformed fetcher cache directory path '" << cacheDirectory
+ << "', error: " + path.error();
- fetcherInfo.mutable_command_info()->CopyFrom(commandInfo);
+ return Error(path.error());
+ }
- fetcherInfo.set_work_directory(directory);
+ if (path.isSome() && os::exists(path.get())) {
+ Try<Nothing> rmdir = os::rmdir(path.get(), true);
+ if (rmdir.isError()) {
+ LOG(ERROR) << "Could not delete fetcher cache directory '"
+ << cacheDirectory << "', error: " + rmdir.error();
- if (user.isSome()) {
- fetcherInfo.set_user(user.get());
+ return rmdir;
+ }
}
- if (!flags.frameworks_home.empty()) {
- fetcherInfo.set_frameworks_home(flags.frameworks_home);
+ return Nothing();
+}
+
+
+Try<string> Fetcher::basename(const string& uri)
+{
+ // TODO(bernd-mesos): full URI parsing, then move this to stout.
+ // There is a bug (or is it a feature?) in the original fetcher
+ // code without caching that remains in effect here. URIs are
+ // treated like file paths, looking for occurrences of "/",
+ // but ignoring other separators that can show up
+ // (e.g. "?", "=" in HTTP URLs).
+
+ if (uri.find_first_of('\\') != string::npos ||
+ uri.find_first_of('\'') != string::npos ||
+ uri.find_first_of('\0') != string::npos) {
+ return Error("Illegal characters in URI");
}
- map<string, string> result;
+ size_t index = uri.find("://");
+ if (index != string::npos && 1 < index) {
+ // URI starts with protocol specifier, e.g., http://, https://,
+ // ftp://, ftps://, hdfs://, hftp://, s3://, s3n://.
- if (!flags.hadoop_home.empty()) {
- result["HADOOP_HOME"] = flags.hadoop_home;
+ string path = uri.substr(index + 3);
+ if (!strings::contains(path, "/") || path.size() <= path.find("/") + 1) {
+ return Error("Malformed URI (missing path): " + uri);
+ }
+
+ return path.substr(path.find_last_of("/") + 1);
}
+ return os::basename(uri);
+}
- result["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(fetcherInfo));
- return result;
+Try<Nothing> Fetcher::validateUri(const string& uri)
+{
+ Try<string> result = basename(uri);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return Nothing();
}
-Future<Nothing> Fetcher::fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr)
+static Try<Nothing> validateUris(const CommandInfo& commandInfo)
{
- if (commandInfo.uris().size() == 0) {
- return Nothing();
+ foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+ Try<Nothing> validation = Fetcher::validateUri(uri.value());
+ if (validation.isError()) {
+ return Error(validation.error());
+ }
}
- return dispatch(process.get(),
- &FetcherProcess::fetch,
- containerId,
- commandInfo,
- directory,
- user,
- flags,
- stdout,
- stderr);
+ return Nothing();
+}
+
+
+Result<string> Fetcher::uriToLocalPath(
+ const string& uri,
+ const Option<string>& frameworksHome)
+{
+ if (!strings::startsWith(uri, "file://") && strings::contains(uri, "://")) {
+ return None();
+ }
+
+ string path = uri;
+ bool fileUri = false;
+
+ if (strings::startsWith(path, FILE_URI_LOCALHOST)) {
+ path = path.substr(FILE_URI_LOCALHOST.size());
+ fileUri = true;
+ } else if (strings::startsWith(path, FILE_URI_PREFIX)) {
+ path = path.substr(FILE_URI_PREFIX.size());
+ fileUri = true;
+ }
+
+ if (fileUri && !strings::startsWith(path, "/")) {
+ return Error("File URI only supports absolute paths");
+ }
+
+ if (path.find_first_of("/") != 0) {
+ if (frameworksHome.isNone() || frameworksHome.get().empty()) {
+ return Error("A relative path was passed for the resource but the "
+ "Mesos framework home was not specified. "
+ "Please either provide this config option "
+ "or avoid using a relative path");
+ } else {
+ path = path::join(frameworksHome.get(), path);
+ LOG(INFO) << "Prepended Mesos frameworks home to relative path, "
+ << "making it: '" << path << "'";
+ }
+ }
+
+ return path;
+}
+
+
+bool Fetcher::isNetUri(const std::string& uri)
+{
+ return strings::startsWith(uri, "http://") ||
+ strings::startsWith(uri, "https://") ||
+ strings::startsWith(uri, "ftp://") ||
+ strings::startsWith(uri, "ftps://");
}
Future<Nothing> Fetcher::fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
- const string& directory,
+ const string& sandboxDirectory,
const Option<string>& user,
+ const SlaveID& slaveId,
const Flags& flags)
{
if (commandInfo.uris().size() == 0) {
@@ -123,8 +219,9 @@ Future<Nothing> Fetcher::fetch(
&FetcherProcess::fetch,
containerId,
commandInfo,
- directory,
+ sandboxDirectory,
user,
+ slaveId,
flags);
}
@@ -143,124 +240,459 @@ FetcherProcess::~FetcherProcess()
}
-Future<Nothing> FetcherProcess::fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr)
+// Find out how large a potential download from the given URI is.
+static Try<Bytes> fetchSize(
+ const string& uri,
+ const Option<string>& frameworksHome)
{
- VLOG(1) << "Starting to fetch URIs for container: " << containerId
- << ", directory: " << directory;
+ VLOG(1) << "Fetching size for URI: " << uri;
- Try<Subprocess> subprocess =
- run(commandInfo, directory, user, flags, stdout, stderr);
+ Result<string> path = Fetcher::uriToLocalPath(uri, frameworksHome);
+ if (path.isError()) {
+ return Error(path.error());
+ }
+ if (path.isSome()) {
+ Try<Bytes> size = os::stat::size(path.get(), os::stat::FOLLOW_SYMLINK);
+ if (size.isError()) {
+ return Error("Could not determine file size for: '" + path.get() +
+ "', error: " + size.error());
+ }
+ return size.get();
+ }
- if (subprocess.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ if (Fetcher::isNetUri(uri)) {
+ Try<Bytes> size = net::contentLength(uri);
+ if (size.isError()) {
+ return Error(size.error());
+ }
+ if (size.get() == 0) {
+ return Error("URI reported content-length 0: " + uri);
+ }
+
+ return size.get();
}
- subprocessPids[containerId] = subprocess.get().pid();
+ HDFS hdfs;
- return subprocess.get().status()
- .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+ Try<bool> available = hdfs.available();
+ if (available.isError() || !available.get()) {
+ return Error("Hadoop client not available: " + available.error());
+ }
+
+ Try<Bytes> size = hdfs.du(uri);
+ if (size.isError()) {
+ return Error("Hadoop client could not determine size: " + size.error());
+ }
+
+ return size.get();
}
Future<Nothing> FetcherProcess::fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
- const string& directory,
+ const string& sandboxDirectory,
const Option<string>& user,
+ const SlaveID& slaveId,
const Flags& flags)
{
VLOG(1) << "Starting to fetch URIs for container: " << containerId
- << ", directory: " << directory;
+ << ", directory: " << sandboxDirectory;
+
+ // TODO(bernd-mesos): This will disappear once we inject flags at
+ // Fetcher/FetcherProcess creation time. For now we trust this is
+ // always the exact same value.
+ cache.setSpace(flags.fetcher_cache_size);
+
+ Try<Nothing> validated = validateUris(commandInfo);
+ if (validated.isError()) {
+ return Failure("Could not fetch: " + validated.error());
+ }
+
+ Option<string> commandUser = user;
+ if (commandInfo.has_user()) {
+ commandUser = commandInfo.user();
+ }
+
+ string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+ if (commandUser.isSome()) {
+ // Segregating per-user cache directories.
+ cacheDirectory = path::join(cacheDirectory, commandUser.get());
+ }
- Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+ if (commandUser.isSome()) {
+ // First assure that we are working for a valid user.
+ // TODO(bernd-mesos): This should be asynchronous.
+ Try<Nothing> chown = os::chown(commandUser.get(), sandboxDirectory);
+ if (chown.isError()) {
+ return Failure("Failed to chown directory: " + sandboxDirectory +
+ " to user: " + commandUser.get() +
+ " with error: " + chown.error());
+ }
+ }
- if (subprocess.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+ // For each URI we determine if we should use the cache and if so we
+ // try and either get the cache entry or create a cache entry. If
+ // we're getting the cache entry then we might need to wait for that
+ // cache entry to be downloaded. If we're creating a new cache entry
+ // then we need to properly reserve the cache space (and perform any
+ // evictions). Thus, there are three possibilities for each URI:
+ //
+ // (1) We are not using the cache.
+ // (2) We are using the cache but need to wait for an entry to be
+ // downloaded.
+ // (3) We are using the cache and need to create a new entry.
+ //
+ // We capture whether or not we're using the cache using an Option
+ // as a value in a map, i.e., if we are not trying to use the cache
+ // as in (1) above then the Option is None otherwise as in (2) and
+ // (3) the Option is Some. And to capture the asynchronous nature of
+ // both (2) and (3) that Option holds a Future to the actual cache
+ // entry.
+ hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>> entries;
+
+ foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+ if (!uri.cache()) {
+ entries[uri] = None();
+ continue;
+ }
+
+ // Check if this is already in the cache (but not necessarily
+ // downloaded).
+ const Option<shared_ptr<Cache::Entry>> entry =
+ cache.get(commandUser, uri.value());
+
+ if (entry.isSome()) {
+ entry.get()->reference();
+
+ // Wait for the URI to be downloaded into the cache (or fail)
+ entries[uri] = entry.get()->completion()
+ .then(defer(self(), [=]() {
+ return Future<shared_ptr<Cache::Entry>>(entry.get());
+ }));
+ } else {
+ shared_ptr<Cache::Entry> newEntry =
+ cache.create(cacheDirectory, commandUser, uri);
+
+ newEntry->reference();
+
+ entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
+ .then(defer(self(),
+ &FetcherProcess::reserveCacheSpace,
+ lambda::_1,
+ newEntry));
+ }
}
- subprocessPids[containerId] = subprocess.get().pid();
+ list<Future<shared_ptr<Cache::Entry>>> futures;
- return subprocess.get().status()
- .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+ // Get out all of the futures we need to wait for so we can wait on them
+ // together via 'await'.
+ foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+ entries) {
+ if (entry.isSome()) {
+ futures.push_back(entry.get());
+ }
+ }
+
+ return _fetch(futures,
+ entries,
+ containerId,
+ sandboxDirectory,
+ cacheDirectory,
+ commandUser,
+ flags);
}
Future<Nothing> FetcherProcess::_fetch(
+ const list<Future<shared_ptr<Cache::Entry>>> futures,
+ const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
+ entries,
const ContainerID& containerId,
- const Option<int>& status)
+ const string& sandboxDirectory,
+ const string& cacheDirectory,
+ const Option<string>& user,
+ const Flags& flags)
{
- subprocessPids.erase(containerId);
+ return await(futures)
+ .then(defer(self(),
+ &FetcherProcess::__fetch,
+ entries))
+ .then(defer(self(),
+ &FetcherProcess::___fetch,
+ lambda::_1,
+ containerId,
+ sandboxDirectory,
+ cacheDirectory,
+ user,
+ flags));
+}
+
- if (status.isNone()) {
- return Failure("No status available from fetcher");
- } else if (status.get() != 0) {
- return Failure("Failed to fetch URIs for container '" +
- stringify(containerId) + "'with exit status: " +
- stringify(status.get()));
+// For each URI, if there is a cache entry and waiting for it was
+// successful, extract it and add it to the resulting map. Otherwise
+// we'll assume we are not using or cannot use the cache for this URI.
+Future<hashmap<CommandInfo::URI,
+ Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
+FetcherProcess::__fetch(
+ const hashmap<CommandInfo::URI,
+ Option<Future<shared_ptr<Cache::Entry>>>>& entries)
+{
+ hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+ foreachpair (const CommandInfo::URI& uri,
+ const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+ entries) {
+ if (entry.isSome()) {
+ if (entry.get().isReady()) {
+ result[uri] = entry.get().get();
+ } else {
+ LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
+ << uri.value()
+ << "', due to failure to fetch through the cache, "
+ << "with error: " << entry.get().failure();
+
+ result[uri] = None();
+ }
+ } else {
+ // No entry means bypassing the cache.
+ result[uri] = None();
+ }
}
- return Nothing();
+ return result;
}
-Try<Subprocess> FetcherProcess::run(
- const CommandInfo& commandInfo,
- const string& directory,
+Future<Nothing> FetcherProcess::___fetch(
+ const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
+ const ContainerID& containerId,
+ const string& sandboxDirectory,
+ const string& cacheDirectory,
const Option<string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr)
+ const Flags& flags)
{
- // Determine path for mesos-fetcher.
- Result<string> realpath = os::realpath(
- path::join(flags.launcher_dir, "mesos-fetcher"));
+ // Now construct the FetcherInfo based on which URIs we're using
+ // the cache for and which ones we are bypassing the cache.
+ FetcherInfo info;
+
+ foreachpair (const CommandInfo::URI& uri,
+ const Option<shared_ptr<Cache::Entry>>& entry,
+ entries) {
+ FetcherInfo::Item* item = info.add_items();
+
+ item->mutable_uri()->CopyFrom(uri);
+
+ if (entry.isSome()) {
+ if (entry.get()->completion().isPending()) {
+ // Since the entry is not yet "complete", i.e.,
+ // 'completion().isPending()', it must be the case that we created
+ // the entry in FetcherProcess::fetch(). Otherwise the entry should
+ // have been in the cache already and we would have waited for its
+ // completion in FetcherProcess::fetch().
+ item->set_action(FetcherInfo::Item::DOWNLOAD_AND_CACHE);
+ item->set_cache_filename(entry.get()->filename);
+ } else {
+ CHECK_READY(entry.get()->completion());
+ item->set_action(FetcherInfo::Item::RETRIEVE_FROM_CACHE);
+ item->set_cache_filename(entry.get()->filename);
+ }
+ } else {
+ item->set_action(FetcherInfo::Item::BYPASS_CACHE);
+ }
+ }
- if (!realpath.isSome()) {
- LOG(ERROR) << "Failed to determine the canonical path "
- << "for the mesos-fetcher '"
- << path::join(flags.launcher_dir, "mesos-fetcher")
- << "': "
- << (realpath.isError() ? realpath.error()
- : "No such file or directory");
- return Error("Could not fetch URIs: failed to find mesos-fetcher");
+ info.set_sandbox_directory(sandboxDirectory);
+ info.set_cache_directory(cacheDirectory);
+
+ if (user.isSome()) {
+ info.set_user(user.get());
}
- // Now the actual mesos-fetcher command.
- string command = realpath.get();
+ if (!flags.frameworks_home.empty()) {
+ info.set_frameworks_home(flags.frameworks_home);
+ }
- LOG(INFO) << "Fetching URIs using command '" << command << "'";
+ return run(containerId, info, flags)
+ .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
+ .then(defer(self(), &FetcherProcess::__runSucceed, entries));
+}
- Try<Subprocess> fetcherSubprocess = subprocess(
- command,
- Subprocess::PIPE(),
- stdout.isSome()
- ? Subprocess::FD(stdout.get())
- : Subprocess::PIPE(),
- stderr.isSome()
- ? Subprocess::FD(stderr.get())
- : Subprocess::PIPE(),
- Fetcher::environment(commandInfo, directory, user, flags));
- if (fetcherSubprocess.isError()) {
- return Error(
- "Failed to execute mesos-fetcher: " + fetcherSubprocess.error());
+Future<Nothing> FetcherProcess::__runFail(
+ const Future<Nothing>& future,
+ const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+ LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+ foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+ if (entry.isSome()) {
+ entry.get()->unreference();
+
+ if (entry.get()->completion().isPending()) {
+ // Unsuccessfully (or partially) downloaded! Remove from the cache.
+ entry.get()->fail();
+ cache.remove(entry.get()); // Might delete partial download.
+ }
+ }
}
- return fetcherSubprocess;
+ return future; // Always propagate the failure!
}
-Try<Subprocess> FetcherProcess::run(
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user,
+Future<Nothing> FetcherProcess::__runSucceed(
+ const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+ foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+ if (entry.isSome()) {
+ entry.get()->unreference();
+
+ if (entry.get()->completion().isPending()) {
+ // Successfully downloaded and cached!
+
+ Try<Nothing> adjust = cache.adjust(entry.get());
+ if (adjust.isSome()) {
+ entry.get()->complete();
+ } else {
+ LOG(WARNING) << "Failed to adjust the cache size for entry '"
+ << entry.get()->key << "' with error: "
+ << adjust.error();
+
+ // Successfully fetched, but not reusable from the cache,
+ // because we are deleting the entry now.
+ entry.get()->fail();
+ cache.remove(entry.get());
+ }
+ }
+ }
+ }
+
+ return Nothing();
+}
+
+
+static off_t delta(
+ const Bytes& actualSize,
+ const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+ if (actualSize < entry->size) {
+ Bytes delta = entry->size - actualSize;
+ LOG(WARNING) << "URI download result for '" << entry->key
+ << "' is smaller than expected by " << stringify(delta)
+ << " at: " << entry->path();
+
+ return -off_t(delta.bytes());
+ } else if (actualSize > entry->size) {
+ Bytes delta = actualSize - entry->size;
+ LOG(WARNING) << "URI download result for '" << entry->key
+ << "' is larger than expected by " << stringify(delta)
+ << " at: " << entry->path();
+
+ return off_t(delta.bytes());
+ }
+
+ return 0;
+}
+
+
+// For testing only.
+// TODO(bernd-mesos): After refactoring slave/containerizer,fetcher so
+// that flags and slave ID get injected, replace this with two functions
+// one of which returns a list of cache file paths, the other the number
+// of entries in the cache table.
+Try<list<Path>> FetcherProcess::cacheFiles(
+ const SlaveID& slaveId,
+ const Flags& flags)
+{
+ list<Path> result;
+
+ const string cacheDirectory =
+ slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+
+ if (!os::exists(cacheDirectory)) {
+ return result;
+ }
+
+ const Try<list<string>> find =
+ os::find(cacheDirectory, CACHE_FILE_NAME_PREFIX);
+
+ if (find.isError()) {
+ return Error("Could not access cache directory '" +
+ cacheDirectory + "' with error: " + find.error());
+ }
+
+ transform(find.get().begin(),
+ find.get().end(),
+ std::back_inserter(result),
+ [](const string& path) { return Path(path); });
+
+ return result;
+}
+
+
+// For testing only.
+size_t FetcherProcess::cacheSize()
+{
+ return cache.size();
+}
+
+
+Bytes FetcherProcess::availableCacheSpace()
+{
+ return cache.availableSpace();
+}
+
+
+Future<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::reserveCacheSpace(
+ const Future<Try<Bytes>>& requestedSpace,
+ const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+ CHECK_READY(requestedSpace);
+
+ if (requestedSpace.get().isError()) {
+ // Let anyone waiting on this future know that we've
+ // failed to download and they should bypass the cache
+ // (any new requests will try again).
+ entry->fail();
+ cache.remove(entry);
+
+ return Failure("Could not determine size of cache file for '" +
+ entry->key + "' with error: " +
+ requestedSpace.get().error());
+ }
+
+ Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+
+ if (reservation.isError()) {
+ // Let anyone waiting on this future know that we've
+ // failed to download and they should bypass the cache
+ // (any new requests will try again).
+ entry->fail();
+ cache.remove(entry);
+
+ return Failure("Failed to reserve space in the cache: " +
+ reservation.error());
+ }
+
+ VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
+
+ cache.claimSpace(requestedSpace.get().get());
+
+ // NOTE: We must set the entry size only when are also claiming the
+ // space! Other functions rely on this dependency (see
+ // Cache::remove()).
+ entry->size = requestedSpace.get().get();
+
+ return entry;
+}
+
+
+Future<Nothing> FetcherProcess::run(
+ const ContainerID& containerId,
+ const FetcherInfo& info,
const Flags& flags)
{
// Before we fetch let's make sure we create 'stdout' and 'stderr'
@@ -273,47 +705,92 @@ Try<Subprocess> FetcherProcess::run(
// today is because we not only need to open the files but also
// chown them.
Try<int> out = os::open(
- path::join(directory, "stdout"),
+ path::join(info.sandbox_directory(), "stdout"),
O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (out.isError()) {
- return Error("Failed to create 'stdout' file: " + out.error());
+ return Failure("Failed to create 'stdout' file: " + out.error());
}
// Repeat for stderr.
Try<int> err = os::open(
- path::join(directory, "stderr"),
+ path::join(info.sandbox_directory(), "stderr"),
O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (err.isError()) {
os::close(out.get());
- return Error("Failed to create 'stderr' file: " + err.error());
+ return Failure("Failed to create 'stderr' file: " + err.error());
}
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(user.get(), directory);
- if (chown.isError()) {
- os::close(out.get());
- os::close(err.get());
- return Error("Failed to chown work directory");
- }
+ string fetcherPath = path::join(flags.launcher_dir, "mesos-fetcher");
+ Result<string> realpath = os::realpath(fetcherPath);
+
+ if (!realpath.isSome()) {
+ LOG(ERROR) << "Failed to determine the canonical path "
+ << "for the mesos-fetcher '"
+ << fetcherPath
+ << "': "
+ << (realpath.isError() ? realpath.error()
+ : "No such file or directory");
+
+ return Failure("Could not fetch URIs: failed to find mesos-fetcher");
}
- Try<Subprocess> subprocess = run(
- commandInfo,
- directory,
- user,
- flags,
- out.get(),
- err.get());
+ // Now the actual mesos-fetcher command.
+ string command = realpath.get();
+
+ // We pass arguments to the fetcher program by means of an
+ // environment variable.
+ map<string, string> environment;
+
+ environment["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(info));
- subprocess.get().status()
- .onAny(lambda::bind(&os::close, out.get()))
- .onAny(lambda::bind(&os::close, err.get()));
+ if (!flags.hadoop_home.empty()) {
+ environment["HADOOP_HOME"] = flags.hadoop_home;
+ }
+
+ VLOG(1) << "Fetching URIs using command '" << command << "'";
+
+ Try<Subprocess> fetcherSubprocess = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::FD(out.get()),
+ Subprocess::FD(err.get()),
+ environment);
- return subprocess;
+ if (fetcherSubprocess.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " +
+ fetcherSubprocess.error());
+ }
+
+ // Remember this PID in case we need to kill the subprocess. See kill().
+ // This value gets reset in __run().
+ subprocessPids[containerId] = fetcherSubprocess.get().pid();
+
+ return fetcherSubprocess.get().status()
+ .then(defer(self(), [=](const Option<int>& status) -> Future<Nothing> {
+ if (status.isNone()) {
+ return Failure("No status available from mesos-fetcher");
+ }
+
+ if (status.get() != 0) {
+ return Failure("Failed to fetch all URIs for container '" +
+ stringify(containerId) +
+ "' with exit status: " +
+ stringify(status.get()));
+ }
+
+ return Nothing();
+ }))
+ .onAny(defer(self(), [=](const Future<Nothing>& result) {
+ // Clear the subprocess PID remembered from running mesos-fetcher.
+ subprocessPids.erase(containerId);
+
+ os::close(out.get());
+ os::close(err.get());
+ }));
}
@@ -328,6 +805,348 @@ void FetcherProcess::kill(const ContainerID& containerId)
}
}
+
+string FetcherProcess::Cache::nextFilename(const CommandInfo::URI& uri)
+{
+ // Different URIs may have the same base name, so we need to
+ // segregate the download results. This can be done by separate
+ // directories or by different file names. We opt for the latter
+ // since there may be tighter limits on how many sub-directories a
+ // file system can bear than on how many files can be in a directory.
+
+ // We put a fixed prefix upfront before the serial number so we can
+ // later easily find cache files with os::find() to support testing.
+
+ // Why we keep the file extension here: When fetching from cache, if
+ // extraction is enabled, the extraction algorithm can look at the
+ // extension of the cache file the same way as it would at a
+ // download of the original URI, and external commands performing
+ // the extraction do not get confused by their source file
+ // missing an expected form of extension. This is included in the
+ // following.
+
+ // Just for human operators who want to take a look at the cache
+ // and relate cache files to URIs, we also add some of the URI's
+ // basename, but not too much so we do not exceed file name size
+ // limits.
+
+ Try<string> base = Fetcher::basename(uri.value());
+ CHECK_SOME(base);
+
+ string s = base.get();
+ if (s.size() > 20) {
+ // Grab only a prefix and a suffix, but for sure including the
+ // file extension.
+ s = s.substr(0, 10) + "_" + s.substr(s.size() - 10, string::npos);
+ }
+
+ ++filenameSerial;
+
+ return CACHE_FILE_NAME_PREFIX + stringify(filenameSerial) + "-" + s;
+}
+
+
+static string cacheKey(const Option<string>& user, const string& uri)
+{
+ return user.isNone() ? uri : user.get() + "@" + uri;
+}
+
+
+shared_ptr<FetcherProcess::Cache::Entry> FetcherProcess::Cache::create(
+ const string& cacheDirectory,
+ const Option<string>& user,
+ const CommandInfo::URI& uri)
+{
+ const string key = cacheKey(user, uri.value());
+ const string filename = nextFilename(uri);
+
+ auto entry = shared_ptr<Cache::Entry>(
+ new Cache::Entry(key, cacheDirectory, filename));
+
+ table.put(key, entry);
+
+ VLOG(1) << "Created cache entry '" << key << "' with file: " << filename;
+
+ return entry;
+}
+
+
+Option<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::Cache::get(
+ const Option<string>& user,
+ const string& uri)
+{
+ const string key = cacheKey(user, uri);
+
+ return table.get(key);
+}
+
+
+bool FetcherProcess::Cache::contains(
+ const Option<string>& user,
+ const string& uri)
+{
+ return get(user, uri).isSome();
+}
+
+
+bool FetcherProcess::Cache::contains(const shared_ptr<Cache::Entry>& entry)
+{
+ Option<shared_ptr<Cache::Entry>> found = table.get(entry->key);
+ if (found.isNone()) {
+ return false;
+ }
+
+ return found == entry;
+}
+
+
+// We are removing an entry if:
+//
+// (1) We failed to determine its prospective cache file size.
+// (2) We failed to download it when invoking the mesos-fetcher.
+// (3) We're evicting it to make room for another entry.
+//
+// In (1) and (2) the contract is that we'll have failed the entry's
+// future before we call remove, so the entry's future should no
+// longer be pending.
+//
+// In (3) it should be the case that the future is no longer pending,
+// because we shouldn't be able to evict something if we're
+// currently downloading it, because it should have a non-zero
+// reference count and therefore the future must either be ready or
+// failed in which case this is just case (1) above.
+//
+// NOTE: It is not necessarily the case that this cache entry has
+// zero references because there might be some waiters on the
+// downloading of this entry which haven't been able to run and find
+// out that the downloading failed.
+//
+// We want to attempt to delete the file regardless of if it being
+// downloaded since it might have been downloaded partially! Deleting
+// this file should not be racing with any other downloading or
+// deleting because all calls into the cache are serialized by the
+// FetcherProcess and since this entry is already in the cache there
+// should not be any other conflicting entries or files representing
+// this entry. Furthermore every cache file has a unique name. Thus
+// no new download conflicts with the manipulation of any pre-existing
+// cache content.
+Try<Nothing> FetcherProcess::Cache::remove(
+ const shared_ptr<Cache::Entry>& entry)
+{
+ VLOG(1) << "Removing cache entry '" << entry->key
+ << "' with filename: " << entry->filename;
+
+ CHECK(!entry->completion().isPending());
+
+ CHECK(contains(entry));
+
+ table.erase(entry->key);
+
+ // We may or may not have started downloading. The download may or may
+ // not have been partial. In any case, clean up whatever is there.
+ if (os::exists(entry->path().value)) {
+ Try<Nothing> rm = os::rm(entry->path().value);
+ if (rm.isError()) {
+ return Error("Could not delete fetcher cache file '" +
+ entry->path().value + "' with error: " + rm.error() +
+ " for entry '" + entry->key +
+ "', leaking cache space: " + stringify(entry->size));
+ }
+ }
+
+ // NOTE: There is an assumption that if and only if 'entry->size > 0'
+ // then we've claimed cache space for this entry! This currently only
+ // gets set in reserveCacheSpace().
+ if (entry->size > 0) {
+ releaseSpace(entry->size);
+
+ entry->size = 0;
+ }
+
+ return Nothing();
+}
+
+
+Try<list<shared_ptr<FetcherProcess::Cache::Entry>>>
+FetcherProcess::Cache::selectVictims(const Bytes& requiredSpace)
+{
+ // TODO(bernd-mesos): Implement more elaborate selection criteria
+ // (LRU/MRU, etc.).
+
+ list<shared_ptr<FetcherProcess::Cache::Entry>> result;
+
+ Bytes space = 0;
+
+ foreachvalue (const shared_ptr<Cache::Entry>& entry, table) {
+ if (!entry->isReferenced()) {
+ result.push_back(entry);
+
+ space += entry->size;
+ if (space >= requiredSpace) {
+ return result;
+ }
+ }
+ }
+
+ return Error("Could not find enough cache files to evict");
+}
+
+
+Try<Nothing> FetcherProcess::Cache::reserve(
+ const Bytes& requestedSpace)
+{
+ if (availableSpace() < requestedSpace) {
+ Bytes missingSpace = requestedSpace - availableSpace();
+
+ VLOG(1) << "Freeing up fetcher cache space for: " << missingSpace;
+
+ const Try<list<shared_ptr<Cache::Entry>>> victims =
+ selectVictims(missingSpace);
+
+ if (victims.isError()) {
+ return Error("Could not free up enough fetcher cache space");
+ }
+
+ foreach (const shared_ptr<Cache::Entry>& entry, victims.get()) {
+ Try<Nothing> removal = remove(entry);
+ if (removal.isError()) {
+ return Error(removal.error());
+ }
+ }
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing> FetcherProcess::Cache::adjust(
+ const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+ CHECK(contains(entry));
+
+ Try<Bytes> size =
+ os::stat::size(entry.get()->path().value, os::stat::DO_NOT_FOLLOW_SYMLINK);
+
+ if (size.isSome()) {
+ off_t d = delta(size.get(), entry);
+ if (d <= 0) {
+ entry->size = size.get();
+
+ releaseSpace(Bytes(d));
+ } else {
+ return Error("More cache size now necessary, not adjusting " +
+ entry->key);
+ }
+ } else {
+ // This should never be caused by Mesos itself, but cannot be excluded.
+ return Error("Fetcher cache file for '" + entry->key +
+ "' disappeared from: " + entry->path().value);
+ }
+
+ return Nothing();
+}
+
+
+size_t FetcherProcess::Cache::size()
+{
+ return table.size();
+}
+
+
+void FetcherProcess::Cache::setSpace(const Bytes& bytes)
+{
+ if (space > 0) {
+ // Dynamic cache size changes not supported.
+ CHECK(space == bytes);
+ } else {
+ space = bytes;
+ }
+}
+
+
+void FetcherProcess::Cache::claimSpace(const Bytes& bytes)
+{
+ tally += bytes;
+
+ if (tally > space) {
+ // Used cache volume space exceeds the maximum amount set by
+ // flags.fetcher_cache_size. This may be tolerated temporarily,
+ // if there is sufficient physical space available. But it can
+ // otherwise cause unspecified system behavior at any moment.
+ LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+ << ", exceeds total fetcher cache space: " << space;
+ }
+
+ VLOG(1) << "Claimed cache space: " << bytes << ", now using: " << tally;
+}
+
+
+void FetcherProcess::Cache::releaseSpace(const Bytes& bytes)
+{
+ CHECK(bytes <= tally) << "Attempt to release more cache space than in use - "
+ << " requested: " << bytes << ", in use: " << tally;
+
+
+ tally -= bytes;
+
+ VLOG(1) << "Released cache space: " << bytes << ", now using: " << tally;
+}
+
+
+Bytes FetcherProcess::Cache::availableSpace()
+{
+ if (tally > space) {
+ LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+ << ", exceeds total fetcher cache space: " << space;
+ return 0;
+ }
+
+ return space - tally;
+}
+
+
+void FetcherProcess::Cache::Entry::complete()
+{
+ CHECK_PENDING(promise.future());
+
+ promise.set(Nothing());
+}
+
+
+Future<Nothing> FetcherProcess::Cache::Entry::completion()
+{
+ return promise.future();
+}
+
+
+void FetcherProcess::Cache::Entry::fail()
+{
+ CHECK_PENDING(promise.future());
+
+ promise.fail("Could not download to fetcher cache: " + key);
+}
+
+
+void FetcherProcess::Cache::Entry::reference()
+{
+ referenceCount++;
+}
+
+
+void FetcherProcess::Cache::Entry::unreference()
+{
+ CHECK(referenceCount > 0);
+
+ referenceCount--;
+}
+
+
+bool FetcherProcess::Cache::Entry::isReferenced()
+{
+ return referenceCount > 0;
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 1db0eaf..3b63711 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -16,14 +16,16 @@
* limitations under the License.
*/
-#ifndef __SLAVE_FETCHER_HPP__
-#define __SLAVE_FETCHER_HPP__
+#ifndef __SLAVE_CONTAINERIZER_FETCHER_HPP__
+#define __SLAVE_CONTAINERIZER_FETCHER_HPP__
#include <string>
-#include <vector>
#include <mesos/mesos.hpp>
+#include <mesos/fetcher/fetcher.hpp>
+
+#include <process/id.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
#include <process/subprocess.hpp>
@@ -32,6 +34,8 @@
#include "slave/flags.hpp"
+using mesos::fetcher::FetcherInfo;
+
namespace mesos {
namespace internal {
namespace slave {
@@ -40,48 +44,64 @@ namespace slave {
class FetcherProcess;
// Argument passing to and invocation of the external fetcher program.
-// TODO(bernd-mesos) : Orchestration and synchronization of fetching
-// phases. Bookkeeping of executor files that are cached after
-// downloading from a URI by the fetcher program. Cache eviction.
-// There has to be exactly one fetcher with a distinct cache dir per
-// active slave. This means that the cache dir can only be fixed
-// after the slave ID has been determined by registration or recovery.
+// Bookkeeping of executor files that are cached after downloading from
+// a URI by the fetcher program. Cache eviction. There has to be exactly
+// one fetcher with a distinct cache directory per active slave. This
+// means that the cache directory can only be fixed after the slave ID
+// has been determined by registration or recovery. Downloads to cache
+// files are separated on a per-user basis. The cache must only be used
+// for URIs for which the expected download size can be determined and
+// trusted before downloading. If there is any problem using the cache
+// for any given URI, the fetch procedure automatically reverts to
+// fetching directly into the sandbox directory.
class Fetcher
{
public:
- // Builds the environment used to run mesos-fetcher. This
- // environment contains one variable with the name
- // "MESOS_FETCHER_INFO", and its value is a protobuf of type
- // mesos::fetcher::FetcherInfo.
- static std::map<std::string, std::string> environment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags);
+ // Extracts the basename from a URI. For example, "d.txt" from
+ // "htpp://1.2.3.4:5050/a/b/c/d.txt". The current implementation
+ // only works for fairly regular-shaped URIs with a "/" and a proper
+ // file name at the end.
+ static Try<std::string> basename(const std::string& uri);
+
+ // Some checks to make sure using the URI value in shell commands
+ // is safe.
+ // TODO(benh): These should be pushed into the scheduler driver and
+ // reported to the user.
+ static Try<Nothing> validateUri(const std::string& uri);
+
+ // Determines if the given URI refers to a local file system path
+ // and prepends frameworksHome if it is a relative path. Fails if
+ // frameworksHome is empty and a local path is indicated.
+ static Result<std::string> uriToLocalPath(
+ const std::string& uri,
+ const Option<std::string>& frameworksHome);
+
+ static bool isNetUri(const std::string& uri);
Fetcher();
+ // This is only public for tests.
+ Fetcher(const process::Owned<FetcherProcess>& process);
+
virtual ~Fetcher();
- // Download the URIs specified in the command info and place the
- // resulting files into the given work directory. Chmod said files
- // to the user if given.
- process::Future<Nothing> fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr);
+ // TODO(bernd-mesos): Inject these parameters at Fetcher creation time.
+ // Then also inject the fetcher into the slave at creation time. Then
+ // it will be possible to make this an instance method instead of a
+ // static one for the slave to call during startup or recovery.
+ static Try<Nothing> recover(const SlaveID& slaveId, const Flags& flags);
- // Same as above, but send stdout and stderr to the files 'stdout'
- // and 'stderr' in the specified directory.
+ // Download the URIs specified in the command info and place the
+ // resulting files into the given sandbox directory. Chmod said files
+ // to the user if given. Send stdout and stderr output to files
+ // "stdout" and "stderr" in the given directory. Extract archives and/or
+ // use the cache if so instructed by the given CommandInfo::URI items.
process::Future<Nothing> fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
- const std::string& directory,
+ const std::string& sandboxDirectory,
const Option<std::string>& user,
+ const SlaveID& slaveId,
const Flags& flags);
// Best effort to kill the fetcher subprocess associated with the
@@ -96,58 +116,241 @@ private:
class FetcherProcess : public process::Process<FetcherProcess>
{
public:
- FetcherProcess() : ProcessBase("__fetcher__") {}
+ FetcherProcess() : ProcessBase(process::ID::generate("fetcher")) {}
virtual ~FetcherProcess();
- // Fetcher implementation.
process::Future<Nothing> fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
- const std::string& directory,
+ const std::string& sandboxDirectory,
const Option<std::string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr);
+ const SlaveID& slaveId,
+ const Flags& flags);
- process::Future<Nothing> fetch(
+ // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
+ // in the given directory, using these for trace output.
+ virtual process::Future<Nothing> run(
const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
+ const FetcherInfo& info,
const Flags& flags);
+ // Best effort attempt to kill the external mesos-fetcher process
+ // running on behalf of the given container ID, if any.
void kill(const ContainerID& containerId);
-private:
- // Check status and return an error if any.
- process::Future<Nothing> _fetch(
+ // Representation of the fetcher cache and its contents. There is
+ // exactly one instance per instance of FetcherProcess. All methods
+ // of Cache are to be executed on the latter to ensure atomicity of
+ // cache operations.
+ class Cache
+ {
+ public:
+ class Entry
+ {
+ public:
+ Entry(
+ const std::string& key,
+ const std::string& directory,
+ const std::string& filename)
+ : key(key),
+ directory(directory),
+ filename(filename),
+ size(0),
+ referenceCount(0) {}
+
+ ~Entry() {}
+
+ // Marks this file's download as successful by setting its promise
+ // to the path of the file in the cache.
+ void complete();
+
+ // Indicates whether this file's download into the cache is
+ // successfully completed.
+ process::Future<Nothing> completion();
+
+ // Marks this download as failed, notifying concurrent fetch attempts
+ // waiting for this result, by setting the promise to failed.
+ void fail();
+
+ // While an entry is "referenced" it cannot be evicted from the
+ // cache.
+ void reference();
+ void unreference();
+ bool isReferenced();
+
+ // Returns the path in the filesystem where cache entry resides.
+ // TODO(bernd-mesos): Remove this construct after refactoring so
+ // that the slave flags get injected into the fetcher.
+ Path path() { return Path(path::join(directory, filename)); }
+
+ // Uniquely identifies a user/URI combination.
+ const std::string key;
+
+ // Cache directory where this entry is stored.
+ // TODO(bernd-mesos): Remove this construct after refactoring so
+ // that the slave flags get injected into the fetcher.
+ const std::string directory;
+
+ // The unique name of the file held in the cache on behalf of a
+ // URI.
+ const std::string filename;
+
+ // The expected size of the cache file. This field is set before
+ // downloading. If the actual size of the downloaded file is
+ // different a warning is logged and the field's value adjusted.
+ Bytes size;
+
+ private:
+ // Concurrent fetch attempts can reference the same entry multiple
+ // times.
+ unsigned long referenceCount;
+
+ // Indicates successful downloading to the cache.
+ process::Promise<Nothing> promise;
+ };
+
+ Cache() : space(0), tally(0), filenameSerial(0) {}
+ virtual ~Cache() {}
+
+ // Registers the maximum usable space in the cache directory.
+ // TODO(bernd-mesos): This method will disappear when injecting 'flags'
+ // into the fetcher instead of passing 'flags' around as parameter.
+ void setSpace(const Bytes& bytes);
+
+ void claimSpace(const Bytes& bytes);
+ void releaseSpace(const Bytes& bytes);
+ Bytes availableSpace();
+
+ // Invents a new, distinct base name for a cache file, using the same
+ // filename extension as the URI.
+ std::string nextFilename(const CommandInfo::URI& uri);
+
+ // Creates a new entry and inserts it into the cache table. Also
+ // sets its reference count to 1. Returns the entry.
+ std::shared_ptr<Entry> create(
+ const std::string& cacheDirectory,
+ const Option<std::string>& user,
+ const CommandInfo::URI& uri);
+
+ // Retrieves the cache entry indexed by the parameters, without
+ // changing its reference count.
+ Option<std::shared_ptr<Entry>> get(
+ const Option<std::string>& user,
+ const std::string& uri);
+
+ // Returns whether an entry for this user and URI is in the cache.
+ bool contains(const Option<std::string>& user, const std::string& uri);
+
+ // Returns whether this identical entry is in the cache.
+ bool contains(const std::shared_ptr<Cache::Entry>& entry);
+
+ // Completely deletes a cache entry and its file. Warns on failure.
+ // Virtual for mock testing.
+ virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
+
+ // Determines a list of cache entries to remove, respectively cache files
+ // to delete, so that at least the required amount of space would become
+ // available.
+ Try<std::list<std::shared_ptr<Cache::Entry>>>
+ selectVictims(const Bytes& requiredSpace);
+
+ // Ensures that there is the requested amount of space is available
+ // Evicts other files as necessary to make it so.
+ Try<Nothing> reserve(const Bytes& requestedSpace);
+
+ // Finds out if any predictions about cache file sizes have been
+ // inaccurate, logs this if so, and records the cache files' actual
+ // sizes and adjusts the cache's total amount of space in use.
+ Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
+
+ // Number of entries.
+ size_t size();
+
+ private:
+ // Maximum storable number of bytes in the cache directory.
+ Bytes space;
+
+ // How much space has been reserved to be occupied by cache files.
+ Bytes tally;
+
+ // Used to generate distinct cache file names simply by counting.
+ unsigned long filenameSerial;
+
+ // Maps keys (cache directory / URI combinations) to cache file
+ // entries.
+ hashmap<std::string, std::shared_ptr<Entry>> table;
+ };
+
+ // Public and virtual for mock testing.
+ virtual process::Future<Nothing> _fetch(
+ const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+ futures,
+ const hashmap<
+ CommandInfo::URI,
+ Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+ entries,
const ContainerID& containerId,
- const Option<int>& status);
-
- // Run the mesos-fetcher with custom output redirection. If
- // 'stdout' and 'stderr' file descriptors are provided then respective
- // output from the mesos-fetcher will be redirected to the file
- // descriptors. The file descriptors are duplicated (via dup) because
- // redirecting might still be occuring even after the mesos-fetcher has
- // terminated since there still might be data to be read.
- // This method is only "public" for test purposes.
- Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
+ const std::string& sandboxDirectory,
+ const std::string& cacheDirectory,
const Option<std::string>& user,
- const Flags& flags,
- const Option<int>& stdout,
- const Option<int>& stderr);
+ const Flags& flags);
- // Run the mesos-fetcher, creating a "stdout" and "stderr" file
- // in the given directory and using these for output.
- Try<process::Subprocess> run(
- const CommandInfo& commandInfo,
- const std::string& directory,
+ // Returns a list of cache files on disk for the given slave
+ // (for all users combined). For testing.
+ // TODO(bernd-mesos): Remove the parameters after slave/containerizer
+ // refactoring for injection of these.
+ Try<std::list<Path>> cacheFiles(const SlaveID& slaveId, const Flags& flags);
+
+ // Returns the number of cache entries for the given slave (for all
+ // users combined). For testing.
+ size_t cacheSize();
+
+ // Returns the amount of remaining cache space that is not occupied
+ // by cache entries. For testing.
+ Bytes availableCacheSpace();
+
+private:
+ process::Future<hashmap<
+ CommandInfo::URI,
+ Option<std::shared_ptr<Cache::Entry>>>>
+ __fetch(const hashmap<
+ CommandInfo::URI,
+ Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
+
+ process::Future<Nothing> ___fetch(
+ const hashmap<CommandInfo::URI,
+ Option<std::shared_ptr<Cache::Entry>>>& entries,
+ const ContainerID& containerId,
+ const std::string& sandboxDirectory,
+ const std::string& cacheDirectory,
const Option<std::string>& user,
const Flags& flags);
+ process::Future<Nothing> _run(
+ const Option<int>& status,
+ const ContainerID& containerId);
+
+ void __run(const ContainerID& containerId, const int out, const int err);
+
+ process::Future<Nothing> __runFail(
+ const process::Future<Nothing>& future,
+ const hashmap<CommandInfo::URI,
+ Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+ process::Future<Nothing> __runSucceed(
+ const hashmap<CommandInfo::URI,
+ Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+ // Calls Cache::reserve() and returns a ready entry future if successful,
+ // else Failure. Claims the space and assigns the entry's size to this
+ // amount if and only if successful.
+ process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
+ const process::Future<Try<Bytes>>& requestedSpace,
+ const std::shared_ptr<Cache::Entry>& entry);
+
+ Cache cache;
+
hashmap<ContainerID, pid_t> subprocessPids;
};
@@ -155,4 +358,4 @@ private:
} // namespace internal {
} // namespace mesos {
-#endif // __SLAVE_FETCHER_HPP__
+#endif // __SLAVE_CONTAINERIZER_FETCHER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 396e5fb..c363605 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -650,7 +650,8 @@ Future<Nothing> MesosContainerizerProcess::fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
const string& directory,
- const Option<string>& user)
+ const Option<string>& user,
+ const SlaveID& slaveId)
{
if (!containers_.contains(containerId)) {
return Failure("Container is already destroyed");
@@ -661,6 +662,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
commandInfo,
directory,
user,
+ slaveId,
flags);
}
@@ -785,7 +787,8 @@ Future<bool> MesosContainerizerProcess::_launch(
containerId,
executorInfo.command(),
directory,
- user))
+ user,
+ slaveId))
.then(defer(self(), &Self::exec, containerId, pipes[1]))
.onAny(lambda::bind(&os::close, pipes[0]))
.onAny(lambda::bind(&os::close, pipes[1]));
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 31f5051..3ac2387 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -186,7 +186,8 @@ private:
const ContainerID& containerId,
const CommandInfo& commandInfo,
const std::string& directory,
- const Option<std::string>& user);
+ const Option<std::string>& user,
+ const SlaveID& slaveId);
process::Future<bool> _launch(
const ContainerID& containerId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7f2e1e8..ab87098 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -67,6 +67,23 @@ mesos::internal::slave::Flags::Flags()
"Attributes of machine, in the form:\n"
"rack:2 or 'rack:2;u:1'");
+ add(&Flags::fetcher_cache_size, "fetcher_cache_size",
+ "Size of the fetcher cache in Bytes.",
+ DEFAULT_FETCHER_CACHE_SIZE);
+
+ // By default the fetcher cache directory is held inside the work
+ // directory, so everything can be deleted or archived in one swoop,
+ // in particular during testing. However, a typical production
+ // scenario is to use a separate cache volume. First, it is not meant
+ // to be backed up. Second, you want to avoid that sandbox directories
+ // and the cache directory can interfere with each other in
+ // unpredictable ways by occupying shared space. So it is recommended
+ // to set the cache directory explicitly.
+ add(&Flags::fetcher_cache_dir, "fetcher_cache_dir",
+ "Parent directory for fetcher cache directories\n"
+ "(one subdirectory per slave).",
+ "/tmp/mesos/fetch");
+
add(&Flags::work_dir,
"work_dir",
"Directory path to place framework work directories\n", "/tmp/mesos");
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index e84efc1..84dbb8a 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -44,6 +44,8 @@ public:
std::string isolation;
std::string default_role;
Option<std::string> attributes;
+ Bytes fetcher_cache_size;
+ std::string fetcher_cache_dir;
std::string work_dir;
std::string launcher_dir;
std::string hadoop_home; // TODO(benh): Make an Option.
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fdaaea4..271cb03 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -835,6 +835,14 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
LOG(INFO) << "Registered with master " << master.get()
<< "; given slave ID " << slaveId;
+ // TODO(bernd-mesos): Make this an instance method call, see comment
+ // in "fetcher.hpp"".
+ Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
+ if (recovered.isError()) {
+ LOG(FATAL) << "Could not initialize fetcher cache: "
+ << recovered.error();
+ }
+
state = RUNNING;
statusUpdateManager->resume(); // Resume status updates.
@@ -3758,7 +3766,6 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
}
-
Future<Nothing> Slave::recover(const Result<state::State>& state)
{
if (state.isError()) {
@@ -3830,6 +3837,13 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
metrics.recovery_errors += slaveState.get().errors;
}
+ // TODO(bernd-mesos): Make this an instance method call, see comment
+ // in "fetcher.hpp"".
+ Try<Nothing> recovered = Fetcher::recover(slaveState.get().id, flags);
+ if (recovered.isError()) {
+ return Failure(recovered.error());
+ }
+
// Recover the frameworks.
foreachvalue (const FrameworkState& frameworkState,
slaveState.get().frameworks) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 8d3e605..3a983c6 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -379,24 +379,28 @@ public:
const Shared<Docker>& docker)
: DockerContainerizerProcess(flags, fetcher, docker)
{
- EXPECT_CALL(*this, fetch(_))
+ EXPECT_CALL(*this, fetch(_, _))
.WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
EXPECT_CALL(*this, pull(_))
.WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
}
- MOCK_METHOD1(
+ MOCK_METHOD2(
fetch,
- process::Future<Nothing>(const ContainerID& containerId));
+ process::Future<Nothing>(
+ const ContainerID& containerId,
+ const SlaveID& slaveId));
MOCK_METHOD1(
pull,
process::Future<Nothing>(const ContainerID& containerId));
- process::Future<Nothing> _fetch(const ContainerID& containerId)
+ process::Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const SlaveID& slaveId)
{
- return DockerContainerizerProcess::fetch(containerId);
+ return DockerContainerizerProcess::fetch(containerId, slaveId);
}
process::Future<Nothing> _pull(const ContainerID& containerId)
@@ -2381,7 +2385,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
Future<Nothing> fetch;
// We want to pause the fetch call to simulate a long fetch time.
- EXPECT_CALL(*process, fetch(_))
+ EXPECT_CALL(*process, fetch(_, _))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(promise.future())));
@@ -2486,7 +2490,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
(Owned<DockerContainerizerProcess>(process)));
Future<Nothing> fetch;
- EXPECT_CALL(*process, fetch(_))
+ EXPECT_CALL(*process, fetch(_, _))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(Nothing())));
@@ -2754,7 +2758,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
- EXPECT_CALL(*process, fetch(_))
+ EXPECT_CALL(*process, fetch(_, _))
.WillOnce(Return(Failure("some error from fetch")));
vector<TaskInfo> tasks;