You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:46:00 UTC

[2/4] mesos git commit: Added a cache to the Fetcher.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index 9e9e9d0..c519bff 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,27 +16,40 @@
  * limitations under the License.
  */
 
-#include <mesos/fetcher/fetcher.hpp>
+#include <unordered_map>
 
+#include <process/async.hpp>
+#include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/dispatch.hpp>
-#include <process/process.hpp>
+
+#include <stout/net.hpp>
+#include <stout/path.hpp>
+
+#include "hdfs/hdfs.hpp"
 
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
 
+using std::list;
 using std::map;
+using std::shared_ptr;
 using std::string;
+using std::transform;
 using std::vector;
 
 using process::Future;
 
-using mesos::fetcher::FetcherInfo;
-
 namespace mesos {
 namespace internal {
 namespace slave {
 
+static const string FILE_URI_PREFIX = "file://";
+static const string FILE_URI_LOCALHOST = "file://localhost";
+
+static const string CACHE_FILE_NAME_PREFIX = "c";
+
 
 Fetcher::Fetcher() : process(new FetcherProcess())
 {
@@ -44,6 +57,13 @@ Fetcher::Fetcher() : process(new FetcherProcess())
 }
 
 
+Fetcher::Fetcher(const process::Owned<FetcherProcess>& process)
+  : process(process)
+{
+  spawn(process.get());
+}
+
+
 Fetcher::~Fetcher()
 {
   terminate(process.get());
@@ -51,68 +71,144 @@ Fetcher::~Fetcher()
 }
 
 
-map<string, string> Fetcher::environment(
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags)
+Try<Nothing> Fetcher::recover(const SlaveID& slaveId, const Flags& flags)
 {
-  FetcherInfo fetcherInfo;
+  // Good enough for now, simple, least-effort recovery.
+  VLOG(1) << "Clearing fetcher cache";
+
+  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+  Result<string> path = os::realpath(cacheDirectory);
+  if (path.isError()) {
+    LOG(ERROR) << "Malformed fetcher cache directory path '" << cacheDirectory
+               << "', error: " + path.error();
 
-  fetcherInfo.mutable_command_info()->CopyFrom(commandInfo);
+    return Error(path.error());
+  }
 
-  fetcherInfo.set_work_directory(directory);
+  if (path.isSome() && os::exists(path.get())) {
+    Try<Nothing> rmdir = os::rmdir(path.get(), true);
+    if (rmdir.isError()) {
+      LOG(ERROR) << "Could not delete fetcher cache directory '"
+                 << cacheDirectory << "', error: " + rmdir.error();
 
-  if (user.isSome()) {
-    fetcherInfo.set_user(user.get());
+      return rmdir;
+    }
   }
 
-  if (!flags.frameworks_home.empty()) {
-    fetcherInfo.set_frameworks_home(flags.frameworks_home);
+  return Nothing();
+}
+
+
+Try<string> Fetcher::basename(const string& uri)
+{
+  // TODO(bernd-mesos): full URI parsing, then move this to stout.
+  // There is a bug (or is it a feature?) in the original fetcher
+  // code without caching that remains in effect here. URIs are
+  // treated like file paths, looking for occurrences of "/",
+  // but ignoring other separators that can show up
+  // (e.g. "?", "=" in HTTP URLs).
+
+  if (uri.find_first_of('\\') != string::npos ||
+      uri.find_first_of('\'') != string::npos ||
+      uri.find_first_of('\0') != string::npos) {
+      return Error("Illegal characters in URI");
   }
 
-  map<string, string> result;
+  size_t index = uri.find("://");
+  if (index != string::npos && 1 < index) {
+    // URI starts with protocol specifier, e.g., http://, https://,
+    // ftp://, ftps://, hdfs://, hftp://, s3://, s3n://.
 
-  if (!flags.hadoop_home.empty()) {
-    result["HADOOP_HOME"] = flags.hadoop_home;
+    string path = uri.substr(index + 3);
+    if (!strings::contains(path, "/") || path.size() <= path.find("/") + 1) {
+      return Error("Malformed URI (missing path): " + uri);
+    }
+
+    return path.substr(path.find_last_of("/") + 1);
   }
+  return os::basename(uri);
+}
 
-  result["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(fetcherInfo));
 
-  return result;
+Try<Nothing> Fetcher::validateUri(const string& uri)
+{
+  Try<string> result = basename(uri);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return Nothing();
 }
 
 
-Future<Nothing> Fetcher::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+static Try<Nothing> validateUris(const CommandInfo& commandInfo)
 {
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    Try<Nothing> validation = Fetcher::validateUri(uri.value());
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
   }
 
-  return dispatch(process.get(),
-                  &FetcherProcess::fetch,
-                  containerId,
-                  commandInfo,
-                  directory,
-                  user,
-                  flags,
-                  stdout,
-                  stderr);
+  return Nothing();
+}
+
+
+Result<string> Fetcher::uriToLocalPath(
+    const string& uri,
+    const Option<string>& frameworksHome)
+{
+  if (!strings::startsWith(uri, "file://") && strings::contains(uri, "://")) {
+    return None();
+  }
+
+  string path = uri;
+  bool fileUri = false;
+
+  if (strings::startsWith(path, FILE_URI_LOCALHOST)) {
+    path = path.substr(FILE_URI_LOCALHOST.size());
+    fileUri = true;
+  } else if (strings::startsWith(path, FILE_URI_PREFIX)) {
+    path = path.substr(FILE_URI_PREFIX.size());
+    fileUri = true;
+  }
+
+  if (fileUri && !strings::startsWith(path, "/")) {
+    return Error("File URI only supports absolute paths");
+  }
+
+  if (path.find_first_of("/") != 0) {
+    if (frameworksHome.isNone() || frameworksHome.get().empty()) {
+      return Error("A relative path was passed for the resource but the "
+                   "Mesos framework home was not specified. "
+                   "Please either provide this config option "
+                   "or avoid using a relative path");
+    } else {
+      path = path::join(frameworksHome.get(), path);
+      LOG(INFO) << "Prepended Mesos frameworks home to relative path, "
+                << "making it: '" << path << "'";
+    }
+  }
+
+  return path;
+}
+
+
+bool Fetcher::isNetUri(const std::string& uri)
+{
+  return strings::startsWith(uri, "http://")  ||
+         strings::startsWith(uri, "https://") ||
+         strings::startsWith(uri, "ftp://")   ||
+         strings::startsWith(uri, "ftps://");
 }
 
 
 Future<Nothing> Fetcher::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
-    const string& directory,
+    const string& sandboxDirectory,
     const Option<string>& user,
+    const SlaveID& slaveId,
     const Flags& flags)
 {
   if (commandInfo.uris().size() == 0) {
@@ -123,8 +219,9 @@ Future<Nothing> Fetcher::fetch(
                   &FetcherProcess::fetch,
                   containerId,
                   commandInfo,
-                  directory,
+                  sandboxDirectory,
                   user,
+                  slaveId,
                   flags);
 }
 
@@ -143,124 +240,459 @@ FetcherProcess::~FetcherProcess()
 }
 
 
-Future<Nothing> FetcherProcess::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+// Find out how large a potential download from the given URI is.
+static Try<Bytes> fetchSize(
+    const string& uri,
+    const Option<string>& frameworksHome)
 {
-  VLOG(1) << "Starting to fetch URIs for container: " << containerId
-        << ", directory: " << directory;
+  VLOG(1) << "Fetching size for URI: " << uri;
 
-  Try<Subprocess> subprocess =
-    run(commandInfo, directory, user, flags, stdout, stderr);
+  Result<string> path = Fetcher::uriToLocalPath(uri, frameworksHome);
+  if (path.isError()) {
+    return Error(path.error());
+  }
+  if (path.isSome()) {
+    Try<Bytes> size = os::stat::size(path.get(), os::stat::FOLLOW_SYMLINK);
+    if (size.isError()) {
+      return Error("Could not determine file size for: '" + path.get() +
+                     "', error: " + size.error());
+    }
+    return size.get();
+  }
 
-  if (subprocess.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  if (Fetcher::isNetUri(uri)) {
+    Try<Bytes> size = net::contentLength(uri);
+    if (size.isError()) {
+      return Error(size.error());
+    }
+    if (size.get() == 0) {
+      return Error("URI reported content-length 0: " + uri);
+    }
+
+    return size.get();
   }
 
-  subprocessPids[containerId] = subprocess.get().pid();
+  HDFS hdfs;
 
-  return subprocess.get().status()
-    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+  Try<bool> available = hdfs.available();
+  if (available.isError() || !available.get()) {
+    return Error("Hadoop client not available: " + available.error());
+  }
+
+  Try<Bytes> size = hdfs.du(uri);
+  if (size.isError()) {
+    return Error("Hadoop client could not determine size: " + size.error());
+  }
+
+  return size.get();
 }
 
 
 Future<Nothing> FetcherProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
-    const string& directory,
+    const string& sandboxDirectory,
     const Option<string>& user,
+    const SlaveID& slaveId,
     const Flags& flags)
 {
   VLOG(1) << "Starting to fetch URIs for container: " << containerId
-        << ", directory: " << directory;
+          << ", directory: " << sandboxDirectory;
+
+  // TODO(bernd-mesos): This will disappear once we inject flags at
+  // Fetcher/FetcherProcess creation time. For now we trust this is
+  // always the exact same value.
+  cache.setSpace(flags.fetcher_cache_size);
+
+  Try<Nothing> validated = validateUris(commandInfo);
+  if (validated.isError()) {
+    return Failure("Could not fetch: " + validated.error());
+  }
+
+  Option<string> commandUser = user;
+  if (commandInfo.has_user()) {
+    commandUser = commandInfo.user();
+  }
+
+  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+  if (commandUser.isSome()) {
+    // Segregating per-user cache directories.
+    cacheDirectory = path::join(cacheDirectory, commandUser.get());
+  }
 
-  Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+  if (commandUser.isSome()) {
+    // First assure that we are working for a valid user.
+    // TODO(bernd-mesos): This should be asynchronous.
+    Try<Nothing> chown = os::chown(commandUser.get(), sandboxDirectory);
+    if (chown.isError()) {
+      return Failure("Failed to chown directory: " + sandboxDirectory +
+                     " to user: " + commandUser.get() +
+                     " with error: " + chown.error());
+    }
+  }
 
-  if (subprocess.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  // For each URI we determine if we should use the cache and if so we
+  // try and either get the cache entry or create a cache entry. If
+  // we're getting the cache entry then we might need to wait for that
+  // cache entry to be downloaded. If we're creating a new cache entry
+  // then we need to properly reserve the cache space (and perform any
+  // evictions). Thus, there are three possibilities for each URI:
+  //
+  //   (1) We are not using the cache.
+  //   (2) We are using the cache but need to wait for an entry to be
+  //       downloaded.
+  //   (3) We are using the cache and need to create a new entry.
+  //
+  // We capture whether or not we're using the cache using an Option
+  // as a value in a map, i.e., if we are not trying to use the cache
+  // as in (1) above then the Option is None otherwise as in (2) and
+  // (3) the Option is Some. And to capture the asynchronous nature of
+  // both (2) and (3) that Option holds a Future to the actual cache
+  // entry.
+  hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>> entries;
+
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    if (!uri.cache()) {
+      entries[uri] = None();
+      continue;
+    }
+
+    // Check if this is already in the cache (but not necessarily
+    // downloaded).
+    const Option<shared_ptr<Cache::Entry>> entry =
+      cache.get(commandUser, uri.value());
+
+    if (entry.isSome()) {
+      entry.get()->reference();
+
+      // Wait for the URI to be downloaded into the cache (or fail)
+      entries[uri] = entry.get()->completion()
+        .then(defer(self(), [=]() {
+          return Future<shared_ptr<Cache::Entry>>(entry.get());
+        }));
+    } else {
+      shared_ptr<Cache::Entry> newEntry =
+        cache.create(cacheDirectory, commandUser, uri);
+
+      newEntry->reference();
+
+      entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
+        .then(defer(self(),
+                    &FetcherProcess::reserveCacheSpace,
+                    lambda::_1,
+                    newEntry));
+    }
   }
 
-  subprocessPids[containerId] = subprocess.get().pid();
+  list<Future<shared_ptr<Cache::Entry>>> futures;
 
-  return subprocess.get().status()
-    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+  // Get out all of the futures we need to wait for so we can wait on them
+  // together via 'await'.
+  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                entries) {
+    if (entry.isSome()) {
+      futures.push_back(entry.get());
+    }
+  }
+
+  return _fetch(futures,
+                entries,
+                containerId,
+                sandboxDirectory,
+                cacheDirectory,
+                commandUser,
+                flags);
 }
 
 
 Future<Nothing> FetcherProcess::_fetch(
+    const list<Future<shared_ptr<Cache::Entry>>> futures,
+    const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
+      entries,
     const ContainerID& containerId,
-    const Option<int>& status)
+    const string& sandboxDirectory,
+    const string& cacheDirectory,
+    const Option<string>& user,
+    const Flags& flags)
 {
-  subprocessPids.erase(containerId);
+  return await(futures)
+    .then(defer(self(),
+                &FetcherProcess::__fetch,
+                entries))
+    .then(defer(self(),
+                &FetcherProcess::___fetch,
+                lambda::_1,
+                containerId,
+                sandboxDirectory,
+                cacheDirectory,
+                user,
+                flags));
+}
+
 
-  if (status.isNone()) {
-    return Failure("No status available from fetcher");
-  } else if (status.get() != 0) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "'with exit status: " +
-                   stringify(status.get()));
+// For each URI, if there is a cache entry and waiting for it was
+// successful, extract it and add it to the resulting map. Otherwise
+// we'll assume we are not using or cannot use the cache for this URI.
+Future<hashmap<CommandInfo::URI,
+               Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
+FetcherProcess::__fetch(
+    const hashmap<CommandInfo::URI,
+                  Option<Future<shared_ptr<Cache::Entry>>>>& entries)
+{
+  hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+  foreachpair (const CommandInfo::URI& uri,
+               const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+               entries) {
+    if (entry.isSome()) {
+      if (entry.get().isReady()) {
+        result[uri] = entry.get().get();
+      } else {
+        LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
+                     << uri.value()
+                     << "', due to failure to fetch through the cache, "
+                     << "with error: " << entry.get().failure();
+
+        result[uri] = None();
+      }
+    } else {
+      // No entry means bypassing the cache.
+      result[uri] = None();
+    }
   }
 
-  return Nothing();
+  return result;
 }
 
 
-Try<Subprocess> FetcherProcess::run(
-    const CommandInfo& commandInfo,
-    const string& directory,
+Future<Nothing> FetcherProcess::___fetch(
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
+    const ContainerID& containerId,
+    const string& sandboxDirectory,
+    const string& cacheDirectory,
     const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+    const Flags& flags)
 {
-  // Determine path for mesos-fetcher.
-  Result<string> realpath = os::realpath(
-      path::join(flags.launcher_dir, "mesos-fetcher"));
+  // Now construct the FetcherInfo based on which URIs we're using
+  // the cache for and which ones we are bypassing the cache.
+  FetcherInfo info;
+
+  foreachpair (const CommandInfo::URI& uri,
+               const Option<shared_ptr<Cache::Entry>>& entry,
+               entries) {
+    FetcherInfo::Item* item = info.add_items();
+
+    item->mutable_uri()->CopyFrom(uri);
+
+    if (entry.isSome()) {
+      if (entry.get()->completion().isPending()) {
+        // Since the entry is not yet "complete", i.e.,
+        // 'completion().isPending()', it must be the case that we created
+        // the entry in FetcherProcess::fetch(). Otherwise the entry should
+        // have been in the cache already and we would have waited for its
+        // completion in FetcherProcess::fetch().
+        item->set_action(FetcherInfo::Item::DOWNLOAD_AND_CACHE);
+        item->set_cache_filename(entry.get()->filename);
+      } else {
+        CHECK_READY(entry.get()->completion());
+        item->set_action(FetcherInfo::Item::RETRIEVE_FROM_CACHE);
+        item->set_cache_filename(entry.get()->filename);
+      }
+    } else {
+      item->set_action(FetcherInfo::Item::BYPASS_CACHE);
+    }
+  }
 
-  if (!realpath.isSome()) {
-    LOG(ERROR) << "Failed to determine the canonical path "
-                << "for the mesos-fetcher '"
-                << path::join(flags.launcher_dir, "mesos-fetcher")
-                << "': "
-                << (realpath.isError() ? realpath.error()
-                                       : "No such file or directory");
-    return Error("Could not fetch URIs: failed to find mesos-fetcher");
+  info.set_sandbox_directory(sandboxDirectory);
+  info.set_cache_directory(cacheDirectory);
+
+  if (user.isSome()) {
+    info.set_user(user.get());
   }
 
-  // Now the actual mesos-fetcher command.
-  string command = realpath.get();
+  if (!flags.frameworks_home.empty()) {
+    info.set_frameworks_home(flags.frameworks_home);
+  }
 
-  LOG(INFO) << "Fetching URIs using command '" << command << "'";
+  return run(containerId, info, flags)
+    .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
+    .then(defer(self(), &FetcherProcess::__runSucceed, entries));
+}
 
-  Try<Subprocess> fetcherSubprocess = subprocess(
-    command,
-    Subprocess::PIPE(),
-    stdout.isSome()
-      ? Subprocess::FD(stdout.get())
-      : Subprocess::PIPE(),
-    stderr.isSome()
-      ? Subprocess::FD(stderr.get())
-      : Subprocess::PIPE(),
-    Fetcher::environment(commandInfo, directory, user, flags));
 
-  if (fetcherSubprocess.isError()) {
-    return Error(
-        "Failed to execute mesos-fetcher: " +  fetcherSubprocess.error());
+Future<Nothing> FetcherProcess::__runFail(
+    const Future<Nothing>& future,
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+  LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+    if (entry.isSome()) {
+      entry.get()->unreference();
+
+      if (entry.get()->completion().isPending()) {
+        // Unsuccessfully (or partially) downloaded! Remove from the cache.
+        entry.get()->fail();
+        cache.remove(entry.get()); // Might delete partial download.
+      }
+    }
   }
 
-  return fetcherSubprocess;
+  return future; // Always propagate the failure!
 }
 
 
-Try<Subprocess> FetcherProcess::run(
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
+Future<Nothing> FetcherProcess::__runSucceed(
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+    if (entry.isSome()) {
+      entry.get()->unreference();
+
+      if (entry.get()->completion().isPending()) {
+        // Successfully downloaded and cached!
+
+        Try<Nothing> adjust = cache.adjust(entry.get());
+        if (adjust.isSome()) {
+          entry.get()->complete();
+        } else {
+          LOG(WARNING) << "Failed to adjust the cache size for entry '"
+                       << entry.get()->key << "' with error: "
+                       << adjust.error();
+
+          // Successfully fetched, but not reusable from the cache,
+          // because we are deleting the entry now.
+          entry.get()->fail();
+          cache.remove(entry.get());
+        }
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+static off_t delta(
+    const Bytes& actualSize,
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  if (actualSize < entry->size) {
+    Bytes delta = entry->size - actualSize;
+    LOG(WARNING) << "URI download result for '" << entry->key
+                 << "' is smaller than expected by " << stringify(delta)
+                 << " at: " << entry->path();
+
+    return -off_t(delta.bytes());
+  } else if (actualSize > entry->size) {
+    Bytes delta = actualSize - entry->size;
+    LOG(WARNING) << "URI download result for '" << entry->key
+                 << "' is larger than expected by " << stringify(delta)
+                 << " at: " << entry->path();
+
+    return off_t(delta.bytes());
+  }
+
+  return 0;
+}
+
+
+// For testing only.
+// TODO(bernd-mesos): After refactoring slave/containerizer,fetcher so
+// that flags and slave ID get injected, replace this with two functions
+// one of which returns a list of cache file paths, the other the number
+// of entries in the cache table.
+Try<list<Path>> FetcherProcess::cacheFiles(
+    const SlaveID& slaveId,
+    const Flags& flags)
+{
+  list<Path> result;
+
+  const string cacheDirectory =
+    slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+
+  if (!os::exists(cacheDirectory)) {
+    return result;
+  }
+
+  const Try<list<string>> find =
+    os::find(cacheDirectory, CACHE_FILE_NAME_PREFIX);
+
+  if (find.isError()) {
+    return Error("Could not access cache directory '" +
+                 cacheDirectory + "' with error: " + find.error());
+  }
+
+  transform(find.get().begin(),
+            find.get().end(),
+            std::back_inserter(result),
+            [](const string& path) { return Path(path); });
+
+  return result;
+}
+
+
+// For testing only.
+size_t FetcherProcess::cacheSize()
+{
+  return cache.size();
+}
+
+
+Bytes FetcherProcess::availableCacheSpace()
+{
+  return cache.availableSpace();
+}
+
+
+Future<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::reserveCacheSpace(
+    const Future<Try<Bytes>>& requestedSpace,
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  CHECK_READY(requestedSpace);
+
+  if (requestedSpace.get().isError()) {
+    // Let anyone waiting on this future know that we've
+    // failed to download and they should bypass the cache
+    // (any new requests will try again).
+    entry->fail();
+    cache.remove(entry);
+
+    return Failure("Could not determine size of cache file for '" +
+                   entry->key + "' with error: " +
+                   requestedSpace.get().error());
+  }
+
+  Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+
+  if (reservation.isError()) {
+    // Let anyone waiting on this future know that we've
+    // failed to download and they should bypass the cache
+    // (any new requests will try again).
+    entry->fail();
+    cache.remove(entry);
+
+    return Failure("Failed to reserve space in the cache: " +
+                   reservation.error());
+  }
+
+  VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
+
+  cache.claimSpace(requestedSpace.get().get());
+
+  // NOTE: We must set the entry size only when are also claiming the
+  // space! Other functions rely on this dependency (see
+  // Cache::remove()).
+  entry->size = requestedSpace.get().get();
+
+  return entry;
+}
+
+
+Future<Nothing> FetcherProcess::run(
+    const ContainerID& containerId,
+    const FetcherInfo& info,
     const Flags& flags)
 {
   // Before we fetch let's make sure we create 'stdout' and 'stderr'
@@ -273,47 +705,92 @@ Try<Subprocess> FetcherProcess::run(
   // today is because we not only need to open the files but also
   // chown them.
   Try<int> out = os::open(
-      path::join(directory, "stdout"),
+      path::join(info.sandbox_directory(), "stdout"),
       O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
       S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
 
   if (out.isError()) {
-    return Error("Failed to create 'stdout' file: " + out.error());
+    return Failure("Failed to create 'stdout' file: " + out.error());
   }
 
   // Repeat for stderr.
   Try<int> err = os::open(
-      path::join(directory, "stderr"),
+      path::join(info.sandbox_directory(), "stderr"),
       O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
       S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
 
   if (err.isError()) {
     os::close(out.get());
-    return Error("Failed to create 'stderr' file: " + err.error());
+    return Failure("Failed to create 'stderr' file: " + err.error());
   }
 
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(user.get(), directory);
-    if (chown.isError()) {
-      os::close(out.get());
-      os::close(err.get());
-      return Error("Failed to chown work directory");
-    }
+  string fetcherPath = path::join(flags.launcher_dir, "mesos-fetcher");
+  Result<string> realpath = os::realpath(fetcherPath);
+
+  if (!realpath.isSome()) {
+    LOG(ERROR) << "Failed to determine the canonical path "
+               << "for the mesos-fetcher '"
+               << fetcherPath
+               << "': "
+               << (realpath.isError() ? realpath.error()
+                                      : "No such file or directory");
+
+    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
   }
 
-  Try<Subprocess> subprocess = run(
-      commandInfo,
-      directory,
-      user,
-      flags,
-      out.get(),
-      err.get());
+  // Now the actual mesos-fetcher command.
+  string command = realpath.get();
+
+  // We pass arguments to the fetcher program by means of an
+  // environment variable.
+  map<string, string> environment;
+
+  environment["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(info));
 
-  subprocess.get().status()
-    .onAny(lambda::bind(&os::close, out.get()))
-    .onAny(lambda::bind(&os::close, err.get()));
+  if (!flags.hadoop_home.empty()) {
+    environment["HADOOP_HOME"] = flags.hadoop_home;
+  }
+
+  VLOG(1) << "Fetching URIs using command '" << command << "'";
+
+  Try<Subprocess> fetcherSubprocess = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::FD(out.get()),
+      Subprocess::FD(err.get()),
+      environment);
 
-  return subprocess;
+  if (fetcherSubprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " +
+                   fetcherSubprocess.error());
+  }
+
+  // Remember this PID in case we need to kill the subprocess. See kill().
+  // This value gets reset in __run().
+  subprocessPids[containerId] = fetcherSubprocess.get().pid();
+
+  return fetcherSubprocess.get().status()
+    .then(defer(self(), [=](const Option<int>& status) -> Future<Nothing> {
+      if (status.isNone()) {
+        return Failure("No status available from mesos-fetcher");
+      }
+
+      if (status.get() != 0) {
+        return Failure("Failed to fetch all URIs for container '" +
+                       stringify(containerId) +
+                       "' with exit status: " +
+                       stringify(status.get()));
+      }
+
+      return Nothing();
+    }))
+    .onAny(defer(self(), [=](const Future<Nothing>& result) {
+      // Clear the subprocess PID remembered from running mesos-fetcher.
+      subprocessPids.erase(containerId);
+
+      os::close(out.get());
+      os::close(err.get());
+    }));
 }
 
 
@@ -328,6 +805,348 @@ void FetcherProcess::kill(const ContainerID& containerId)
   }
 }
 
+
+string FetcherProcess::Cache::nextFilename(const CommandInfo::URI& uri)
+{
+  // Different URIs may have the same base name, so we need to
+  // segregate the download results. This can be done by separate
+  // directories or by different file names. We opt for the latter
+  // since there may be tighter limits on how many sub-directories a
+  // file system can bear than on how many files can be in a directory.
+
+  // We put a fixed prefix upfront before the serial number so we can
+  // later easily find cache files with os::find() to support testing.
+
+  // Why we keep the file extension here: When fetching from cache, if
+  // extraction is enabled, the extraction algorithm can look at the
+  // extension of the cache file the same way as it would at a
+  // download of the original URI, and external commands performing
+  // the extraction do not get confused by their source file
+  // missing an expected form of extension. This is included in the
+  // following.
+
+  // Just for human operators who want to take a look at the cache
+  // and relate cache files to URIs, we also add some of the URI's
+  // basename, but not too much so we do not exceed file name size
+  // limits.
+
+  Try<string> base = Fetcher::basename(uri.value());
+  CHECK_SOME(base);
+
+  string s = base.get();
+  if (s.size() > 20) {
+    // Grab only a prefix and a suffix, but for sure including the
+    // file extension.
+    s = s.substr(0, 10) + "_" + s.substr(s.size() - 10, string::npos);
+  }
+
+  ++filenameSerial;
+
+  return CACHE_FILE_NAME_PREFIX + stringify(filenameSerial) + "-" + s;
+}
+
+
+static string cacheKey(const Option<string>& user, const string& uri)
+{
+  return user.isNone() ? uri : user.get() + "@" + uri;
+}
+
+
+shared_ptr<FetcherProcess::Cache::Entry> FetcherProcess::Cache::create(
+    const string& cacheDirectory,
+    const Option<string>& user,
+    const CommandInfo::URI& uri)
+{
+  const string key = cacheKey(user, uri.value());
+  const string filename = nextFilename(uri);
+
+  auto entry = shared_ptr<Cache::Entry>(
+      new Cache::Entry(key, cacheDirectory, filename));
+
+  table.put(key, entry);
+
+  VLOG(1) << "Created cache entry '" << key << "' with file: " << filename;
+
+  return entry;
+}
+
+
+Option<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::Cache::get(
+    const Option<string>& user,
+    const string& uri)
+{
+  const string key = cacheKey(user, uri);
+
+  return table.get(key);
+}
+
+
+bool FetcherProcess::Cache::contains(
+    const Option<string>& user,
+    const string& uri)
+{
+  return get(user, uri).isSome();
+}
+
+
+bool FetcherProcess::Cache::contains(const shared_ptr<Cache::Entry>& entry)
+{
+  Option<shared_ptr<Cache::Entry>> found = table.get(entry->key);
+  if (found.isNone()) {
+    return false;
+  }
+
+  return found == entry;
+}
+
+
+// We are removing an entry if:
+//
+//   (1) We failed to determine its prospective cache file size.
+//   (2) We failed to download it when invoking the mesos-fetcher.
+//   (3) We're evicting it to make room for another entry.
+//
+// In (1) and (2) the contract is that we'll have failed the entry's
+// future before we call remove, so the entry's future should no
+// longer be pending.
+//
+// In (3) it should be the case that the future is no longer pending,
+// because we shouldn't be able to evict something if we're
+// currently downloading it, because it should have a non-zero
+// reference count and therefore the future must either be ready or
+// failed in which case this is just case (1) above.
+//
+// NOTE: It is not necessarily the case that this cache entry has
+// zero references because there might be some waiters on the
+// downloading of this entry which haven't been able to run and find
+// out that the downloading failed.
+//
+// We want to attempt to delete the file regardless of if it being
+// downloaded since it might have been downloaded partially! Deleting
+// this file should not be racing with any other downloading or
+// deleting because all calls into the cache are serialized by the
+// FetcherProcess and since this entry is already in the cache there
+// should not be any other conflicting entries or files representing
+// this entry. Furthermore every cache file has a unique name. Thus
+// no new download conflicts with the manipulation of any pre-existing
+// cache content.
+Try<Nothing> FetcherProcess::Cache::remove(
+    const shared_ptr<Cache::Entry>& entry)
+{
+  VLOG(1) << "Removing cache entry '" << entry->key
+          << "' with filename: " << entry->filename;
+
+  CHECK(!entry->completion().isPending());
+
+  CHECK(contains(entry));
+
+  table.erase(entry->key);
+
+  // We may or may not have started downloading. The download may or may
+  // not have been partial. In any case, clean up whatever is there.
+  if (os::exists(entry->path().value)) {
+    Try<Nothing> rm = os::rm(entry->path().value);
+    if (rm.isError()) {
+      return Error("Could not delete fetcher cache file '" +
+                   entry->path().value + "' with error: " + rm.error() +
+                   " for entry '" + entry->key +
+                   "', leaking cache space: " + stringify(entry->size));
+    }
+  }
+
+  // NOTE: There is an assumption that if and only if 'entry->size > 0'
+  // then we've claimed cache space for this entry! This currently only
+  // gets set in reserveCacheSpace().
+  if (entry->size > 0) {
+    releaseSpace(entry->size);
+
+    entry->size = 0;
+  }
+
+  return Nothing();
+}
+
+
+Try<list<shared_ptr<FetcherProcess::Cache::Entry>>>
+FetcherProcess::Cache::selectVictims(const Bytes& requiredSpace)
+{
+  // TODO(bernd-mesos): Implement more elaborate selection criteria
+  // (LRU/MRU, etc.).
+
+  list<shared_ptr<FetcherProcess::Cache::Entry>> result;
+
+  Bytes space = 0;
+
+  foreachvalue (const shared_ptr<Cache::Entry>& entry, table) {
+    if (!entry->isReferenced()) {
+      result.push_back(entry);
+
+      space += entry->size;
+      if (space >= requiredSpace) {
+        return result;
+      }
+    }
+  }
+
+  return Error("Could not find enough cache files to evict");
+}
+
+
+Try<Nothing> FetcherProcess::Cache::reserve(
+    const Bytes& requestedSpace)
+{
+  if (availableSpace() < requestedSpace) {
+    Bytes missingSpace = requestedSpace - availableSpace();
+
+    VLOG(1) << "Freeing up fetcher cache space for: " << missingSpace;
+
+    const Try<list<shared_ptr<Cache::Entry>>> victims =
+      selectVictims(missingSpace);
+
+    if (victims.isError()) {
+      return Error("Could not free up enough fetcher cache space");
+    }
+
+    foreach (const shared_ptr<Cache::Entry>& entry, victims.get()) {
+      Try<Nothing> removal = remove(entry);
+      if (removal.isError()) {
+        return Error(removal.error());
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> FetcherProcess::Cache::adjust(
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  CHECK(contains(entry));
+
+  Try<Bytes> size =
+    os::stat::size(entry.get()->path().value, os::stat::DO_NOT_FOLLOW_SYMLINK);
+
+  if (size.isSome()) {
+    off_t d = delta(size.get(), entry);
+    if (d <= 0) {
+      entry->size = size.get();
+
+      releaseSpace(Bytes(d));
+    } else {
+      return Error("More cache size now necessary, not adjusting " +
+                   entry->key);
+    }
+  } else {
+    // This should never be caused by Mesos itself, but cannot be excluded.
+    return Error("Fetcher cache file for '" + entry->key +
+                 "' disappeared from: " + entry->path().value);
+  }
+
+  return Nothing();
+}
+
+
+size_t FetcherProcess::Cache::size()
+{
+  return table.size();
+}
+
+
+void FetcherProcess::Cache::setSpace(const Bytes& bytes)
+{
+  if (space > 0) {
+    // Dynamic cache size changes not supported.
+    CHECK(space == bytes);
+  } else {
+    space = bytes;
+  }
+}
+
+
+void FetcherProcess::Cache::claimSpace(const Bytes& bytes)
+{
+  tally += bytes;
+
+  if (tally > space) {
+    // Used cache volume space exceeds the maximum amount set by
+    // flags.fetcher_cache_size. This may be tolerated temporarily,
+    // if there is sufficient physical space available. But it can
+    // otherwise cause unspecified system behavior at any moment.
+    LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+                 << ", exceeds total fetcher cache space: " << space;
+  }
+
+  VLOG(1) << "Claimed cache space: " << bytes << ", now using: " << tally;
+}
+
+
+void FetcherProcess::Cache::releaseSpace(const Bytes& bytes)
+{
+  CHECK(bytes <= tally) << "Attempt to release more cache space than in use - "
+                        << " requested: " << bytes << ", in use: " << tally;
+
+
+  tally -= bytes;
+
+  VLOG(1) << "Released cache space: " << bytes << ", now using: " << tally;
+}
+
+
+Bytes FetcherProcess::Cache::availableSpace()
+{
+  if (tally > space) {
+    LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+                 << ", exceeds total fetcher cache space: " << space;
+    return 0;
+  }
+
+  return space - tally;
+}
+
+
+void FetcherProcess::Cache::Entry::complete()
+{
+  CHECK_PENDING(promise.future());
+
+  promise.set(Nothing());
+}
+
+
+Future<Nothing> FetcherProcess::Cache::Entry::completion()
+{
+  return promise.future();
+}
+
+
+void FetcherProcess::Cache::Entry::fail()
+{
+  CHECK_PENDING(promise.future());
+
+  promise.fail("Could not download to fetcher cache: " + key);
+}
+
+
+void FetcherProcess::Cache::Entry::reference()
+{
+  referenceCount++;
+}
+
+
+void FetcherProcess::Cache::Entry::unreference()
+{
+  CHECK(referenceCount > 0);
+
+  referenceCount--;
+}
+
+
+bool FetcherProcess::Cache::Entry::isReferenced()
+{
+  return referenceCount > 0;
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 1db0eaf..3b63711 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-#ifndef __SLAVE_FETCHER_HPP__
-#define __SLAVE_FETCHER_HPP__
+#ifndef __SLAVE_CONTAINERIZER_FETCHER_HPP__
+#define __SLAVE_CONTAINERIZER_FETCHER_HPP__
 
 #include <string>
-#include <vector>
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
+#include <process/id.hpp>
 #include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/subprocess.hpp>
@@ -32,6 +34,8 @@
 
 #include "slave/flags.hpp"
 
+using mesos::fetcher::FetcherInfo;
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -40,48 +44,64 @@ namespace slave {
 class FetcherProcess;
 
 // Argument passing to and invocation of the external fetcher program.
-// TODO(bernd-mesos) : Orchestration and synchronization of fetching
-// phases. Bookkeeping of executor files that are cached after
-// downloading from a URI by the fetcher program. Cache eviction.
-// There has to be exactly one fetcher with a distinct cache dir per
-// active slave. This means that the cache dir can only be fixed
-// after the slave ID has been determined by registration or recovery.
+// Bookkeeping of executor files that are cached after downloading from
+// a URI by the fetcher program. Cache eviction. There has to be exactly
+// one fetcher with a distinct cache directory per active slave. This
+// means that the cache directory can only be fixed after the slave ID
+// has been determined by registration or recovery. Downloads to cache
+// files are separated on a per-user basis. The cache must only be used
+// for URIs for which the expected download size can be determined and
+// trusted before downloading. If there is any problem using the cache
+// for any given URI, the fetch procedure automatically reverts to
+// fetching directly into the sandbox directory.
 class Fetcher
 {
 public:
-  // Builds the environment used to run mesos-fetcher. This
-  // environment contains one variable with the name
-  // "MESOS_FETCHER_INFO", and its value is a protobuf of type
-  // mesos::fetcher::FetcherInfo.
-  static std::map<std::string, std::string> environment(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const Flags& flags);
+  // Extracts the basename from a URI. For example, "d.txt" from
+  // "htpp://1.2.3.4:5050/a/b/c/d.txt". The current implementation
+  // only works for fairly regular-shaped URIs with a "/" and a proper
+  // file name at the end.
+  static Try<std::string> basename(const std::string& uri);
+
+  // Some checks to make sure using the URI value in shell commands
+  // is safe.
+  // TODO(benh): These should be pushed into the scheduler driver and
+  // reported to the user.
+  static Try<Nothing> validateUri(const std::string& uri);
+
+  // Determines if the given URI refers to a local file system path
+  // and prepends frameworksHome if it is a relative path. Fails if
+  // frameworksHome is empty and a local path is indicated.
+  static Result<std::string> uriToLocalPath(
+      const std::string& uri,
+      const Option<std::string>& frameworksHome);
+
+  static bool isNetUri(const std::string& uri);
 
   Fetcher();
 
+  // This is only public for tests.
+  Fetcher(const process::Owned<FetcherProcess>& process);
+
   virtual ~Fetcher();
 
-  // Download the URIs specified in the command info and place the
-  // resulting files into the given work directory. Chmod said files
-  // to the user if given.
-  process::Future<Nothing> fetch(
-      const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+  // TODO(bernd-mesos): Inject these parameters at Fetcher creation time.
+  // Then also inject the fetcher into the slave at creation time. Then
+  // it will be possible to make this an instance method instead of a
+  // static one for the slave to call during startup or recovery.
+  static Try<Nothing> recover(const SlaveID& slaveId, const Flags& flags);
 
-  // Same as above, but send stdout and stderr to the files 'stdout'
-  // and 'stderr' in the specified directory.
+  // Download the URIs specified in the command info and place the
+  // resulting files into the given sandbox directory. Chmod said files
+  // to the user if given. Send stdout and stderr output to files
+  // "stdout" and "stderr" in the given directory. Extract archives and/or
+  // use the cache if so instructed by the given CommandInfo::URI items.
   process::Future<Nothing> fetch(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
       const Option<std::string>& user,
+      const SlaveID& slaveId,
       const Flags& flags);
 
   // Best effort to kill the fetcher subprocess associated with the
@@ -96,58 +116,241 @@ private:
 class FetcherProcess : public process::Process<FetcherProcess>
 {
 public:
-  FetcherProcess() : ProcessBase("__fetcher__") {}
+  FetcherProcess() : ProcessBase(process::ID::generate("fetcher")) {}
 
   virtual ~FetcherProcess();
 
-  // Fetcher implementation.
   process::Future<Nothing> fetch(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
       const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+      const SlaveID& slaveId,
+      const Flags& flags);
 
-  process::Future<Nothing> fetch(
+  // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
+  // in the given directory, using these for trace output.
+  virtual process::Future<Nothing> run(
       const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
+      const FetcherInfo& info,
       const Flags& flags);
 
+  // Best effort attempt to kill the external mesos-fetcher process
+  // running on behalf of the given container ID, if any.
   void kill(const ContainerID& containerId);
 
-private:
-  // Check status and return an error if any.
-  process::Future<Nothing> _fetch(
+  // Representation of the fetcher cache and its contents. There is
+  // exactly one instance per instance of FetcherProcess. All methods
+  // of Cache are to be executed on the latter to ensure atomicity of
+  // cache operations.
+  class Cache
+  {
+  public:
+    class Entry
+    {
+    public:
+      Entry(
+          const std::string& key,
+          const std::string& directory,
+          const std::string& filename)
+        : key(key),
+          directory(directory),
+          filename(filename),
+          size(0),
+          referenceCount(0) {}
+
+      ~Entry() {}
+
+      // Marks this file's download as successful by setting its promise
+      // to the path of the file in the cache.
+      void complete();
+
+      // Indicates whether this file's download into the cache is
+      // successfully completed.
+      process::Future<Nothing> completion();
+
+      // Marks this download as failed, notifying concurrent fetch attempts
+      // waiting for this result, by setting the promise to failed.
+      void fail();
+
+      // While an entry is "referenced" it cannot be evicted from the
+      // cache.
+      void reference();
+      void unreference();
+      bool isReferenced();
+
+      // Returns the path in the filesystem where cache entry resides.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      Path path() { return Path(path::join(directory, filename)); }
+
+      // Uniquely identifies a user/URI combination.
+      const std::string key;
+
+      // Cache directory where this entry is stored.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      const std::string directory;
+
+      // The unique name of the file held in the cache on behalf of a
+      // URI.
+      const std::string filename;
+
+      // The expected size of the cache file. This field is set before
+      // downloading. If the actual size of the downloaded file is
+      // different a warning is logged and the field's value adjusted.
+      Bytes size;
+
+    private:
+      // Concurrent fetch attempts can reference the same entry multiple
+      // times.
+      unsigned long referenceCount;
+
+     // Indicates successful downloading to the cache.
+      process::Promise<Nothing> promise;
+    };
+
+    Cache() : space(0), tally(0), filenameSerial(0) {}
+    virtual ~Cache() {}
+
+    // Registers the maximum usable space in the cache directory.
+    // TODO(bernd-mesos): This method will disappear when injecting 'flags'
+    // into the fetcher instead of passing 'flags' around as parameter.
+    void setSpace(const Bytes& bytes);
+
+    void claimSpace(const Bytes& bytes);
+    void releaseSpace(const Bytes& bytes);
+    Bytes availableSpace();
+
+    // Invents a new, distinct base name for a cache file, using the same
+    // filename extension as the URI.
+    std::string nextFilename(const CommandInfo::URI& uri);
+
+    // Creates a new entry and inserts it into the cache table. Also
+    // sets its reference count to 1. Returns the entry.
+    std::shared_ptr<Entry> create(
+        const std::string& cacheDirectory,
+        const Option<std::string>& user,
+        const CommandInfo::URI& uri);
+
+    // Retrieves the cache entry indexed by the parameters, without
+    // changing its reference count.
+    Option<std::shared_ptr<Entry>> get(
+        const Option<std::string>& user,
+        const std::string& uri);
+
+    // Returns whether an entry for this user and URI is in the cache.
+    bool contains(const Option<std::string>& user, const std::string& uri);
+
+    // Returns whether this identical entry is in the cache.
+    bool contains(const std::shared_ptr<Cache::Entry>& entry);
+
+    // Completely deletes a cache entry and its file. Warns on failure.
+    // Virtual for mock testing.
+    virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
+
+    // Determines a list of cache entries to remove, respectively cache files
+    // to delete, so that at least the required amount of space would become
+    // available.
+    Try<std::list<std::shared_ptr<Cache::Entry>>>
+        selectVictims(const Bytes& requiredSpace);
+
+    // Ensures that there is the requested amount of space is available
+    // Evicts other files as necessary to make it so.
+    Try<Nothing> reserve(const Bytes& requestedSpace);
+
+    // Finds out if any predictions about cache file sizes have been
+    // inaccurate, logs this if so, and records the cache files' actual
+    // sizes and adjusts the cache's total amount of space in use.
+    Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
+
+    // Number of entries.
+    size_t size();
+
+  private:
+    // Maximum storable number of bytes in the cache directory.
+    Bytes space;
+
+    // How much space has been reserved to be occupied by cache files.
+    Bytes tally;
+
+    // Used to generate distinct cache file names simply by counting.
+    unsigned long filenameSerial;
+
+    // Maps keys (cache directory / URI combinations) to cache file
+    // entries.
+    hashmap<std::string, std::shared_ptr<Entry>> table;
+  };
+
+  // Public and virtual for mock testing.
+  virtual process::Future<Nothing> _fetch(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
       const ContainerID& containerId,
-      const Option<int>& status);
-
-  // Run the mesos-fetcher with custom output redirection. If
-  // 'stdout' and 'stderr' file descriptors are provided then respective
-  // output from the mesos-fetcher will be redirected to the file
-  // descriptors. The file descriptors are duplicated (via dup) because
-  // redirecting might still be occuring even after the mesos-fetcher has
-  // terminated since there still might be data to be read.
-  // This method is only "public" for test purposes.
-  Try<process::Subprocess> run(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
       const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+      const Flags& flags);
 
-  // Run the mesos-fetcher, creating a "stdout" and "stderr" file
-  // in the given directory and using these for output.
-  Try<process::Subprocess> run(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
+  // Returns a list of cache files on disk for the given slave
+  // (for all users combined). For testing.
+  // TODO(bernd-mesos): Remove the parameters after slave/containerizer
+  // refactoring for injection of these.
+  Try<std::list<Path>> cacheFiles(const SlaveID& slaveId, const Flags& flags);
+
+  // Returns the number of cache entries for the given slave (for all
+  // users combined). For testing.
+  size_t cacheSize();
+
+  // Returns the amount of remaining cache space that is not occupied
+  // by cache entries. For testing.
+  Bytes availableCacheSpace();
+
+private:
+  process::Future<hashmap<
+      CommandInfo::URI,
+      Option<std::shared_ptr<Cache::Entry>>>>
+  __fetch(const hashmap<
+      CommandInfo::URI,
+      Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
+
+  process::Future<Nothing> ___fetch(
+      const hashmap<CommandInfo::URI,
+      Option<std::shared_ptr<Cache::Entry>>>& entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
       const Option<std::string>& user,
       const Flags& flags);
 
+  process::Future<Nothing> _run(
+      const Option<int>& status,
+      const ContainerID& containerId);
+
+  void __run(const ContainerID& containerId, const int out, const int err);
+
+  process::Future<Nothing> __runFail(
+      const process::Future<Nothing>& future,
+      const hashmap<CommandInfo::URI,
+                    Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+  process::Future<Nothing> __runSucceed(
+      const hashmap<CommandInfo::URI,
+                    Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+  // Calls Cache::reserve() and returns a ready entry future if successful,
+  // else Failure. Claims the space and assigns the entry's size to this
+  // amount if and only if successful.
+  process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
+      const process::Future<Try<Bytes>>& requestedSpace,
+      const std::shared_ptr<Cache::Entry>& entry);
+
+  Cache cache;
+
   hashmap<ContainerID, pid_t> subprocessPids;
 };
 
@@ -155,4 +358,4 @@ private:
 } // namespace internal {
 } // namespace mesos {
 
-#endif // __SLAVE_FETCHER_HPP__
+#endif // __SLAVE_CONTAINERIZER_FETCHER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 396e5fb..c363605 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -650,7 +650,8 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const string& directory,
-    const Option<string>& user)
+    const Option<string>& user,
+    const SlaveID& slaveId)
 {
   if (!containers_.contains(containerId)) {
     return Failure("Container is already destroyed");
@@ -661,6 +662,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
       commandInfo,
       directory,
       user,
+      slaveId,
       flags);
 }
 
@@ -785,7 +787,8 @@ Future<bool> MesosContainerizerProcess::_launch(
                 containerId,
                 executorInfo.command(),
                 directory,
-                user))
+                user,
+                slaveId))
     .then(defer(self(), &Self::exec, containerId, pipes[1]))
     .onAny(lambda::bind(&os::close, pipes[0]))
     .onAny(lambda::bind(&os::close, pipes[1]));

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 31f5051..3ac2387 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -186,7 +186,8 @@ private:
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
       const std::string& directory,
-      const Option<std::string>& user);
+      const Option<std::string>& user,
+      const SlaveID& slaveId);
 
   process::Future<bool> _launch(
       const ContainerID& containerId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7f2e1e8..ab87098 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -67,6 +67,23 @@ mesos::internal::slave::Flags::Flags()
       "Attributes of machine, in the form:\n"
       "rack:2 or 'rack:2;u:1'");
 
+  add(&Flags::fetcher_cache_size, "fetcher_cache_size",
+      "Size of the fetcher cache in Bytes.",
+      DEFAULT_FETCHER_CACHE_SIZE);
+
+  // By default the fetcher cache directory is held inside the work
+  // directory, so everything can be deleted or archived in one swoop,
+  // in particular during testing. However, a typical production
+  // scenario is to use a separate cache volume. First, it is not meant
+  // to be backed up. Second, you want to avoid that sandbox directories
+  // and the cache directory can interfere with each other in
+  // unpredictable ways by occupying shared space. So it is recommended
+  // to set the cache directory explicitly.
+  add(&Flags::fetcher_cache_dir, "fetcher_cache_dir",
+      "Parent directory for fetcher cache directories\n"
+      "(one subdirectory per slave).",
+      "/tmp/mesos/fetch");
+
   add(&Flags::work_dir,
       "work_dir",
       "Directory path to place framework work directories\n", "/tmp/mesos");

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index e84efc1..84dbb8a 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -44,6 +44,8 @@ public:
   std::string isolation;
   std::string default_role;
   Option<std::string> attributes;
+  Bytes fetcher_cache_size;
+  std::string fetcher_cache_dir;
   std::string work_dir;
   std::string launcher_dir;
   std::string hadoop_home; // TODO(benh): Make an Option.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fdaaea4..271cb03 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -835,6 +835,14 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       LOG(INFO) << "Registered with master " << master.get()
                 << "; given slave ID " << slaveId;
 
+      // TODO(bernd-mesos): Make this an instance method call, see comment
+      // in "fetcher.hpp"".
+      Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
+      if (recovered.isError()) {
+          LOG(FATAL) << "Could not initialize fetcher cache: "
+                     << recovered.error();
+      }
+
       state = RUNNING;
 
       statusUpdateManager->resume(); // Resume status updates.
@@ -3758,7 +3766,6 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
 }
 
 
-
 Future<Nothing> Slave::recover(const Result<state::State>& state)
 {
   if (state.isError()) {
@@ -3830,6 +3837,13 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
       metrics.recovery_errors += slaveState.get().errors;
     }
 
+    // TODO(bernd-mesos): Make this an instance method call, see comment
+    // in "fetcher.hpp"".
+    Try<Nothing> recovered = Fetcher::recover(slaveState.get().id, flags);
+    if (recovered.isError()) {
+      return Failure(recovered.error());
+    }
+
     // Recover the frameworks.
     foreachvalue (const FrameworkState& frameworkState,
                   slaveState.get().frameworks) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 8d3e605..3a983c6 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -379,24 +379,28 @@ public:
       const Shared<Docker>& docker)
     : DockerContainerizerProcess(flags, fetcher, docker)
   {
-    EXPECT_CALL(*this, fetch(_))
+    EXPECT_CALL(*this, fetch(_, _))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
 
     EXPECT_CALL(*this, pull(_))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
   }
 
-  MOCK_METHOD1(
+  MOCK_METHOD2(
       fetch,
-      process::Future<Nothing>(const ContainerID& containerId));
+      process::Future<Nothing>(
+          const ContainerID& containerId,
+          const SlaveID& slaveId));
 
   MOCK_METHOD1(
       pull,
       process::Future<Nothing>(const ContainerID& containerId));
 
-  process::Future<Nothing> _fetch(const ContainerID& containerId)
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const SlaveID& slaveId)
   {
-    return DockerContainerizerProcess::fetch(containerId);
+    return DockerContainerizerProcess::fetch(containerId, slaveId);
   }
 
   process::Future<Nothing> _pull(const ContainerID& containerId)
@@ -2381,7 +2385,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
   Future<Nothing> fetch;
 
   // We want to pause the fetch call to simulate a long fetch time.
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(DoAll(FutureSatisfy(&fetch),
                     Return(promise.future())));
 
@@ -2486,7 +2490,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
       (Owned<DockerContainerizerProcess>(process)));
 
   Future<Nothing> fetch;
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(DoAll(FutureSatisfy(&fetch),
                     Return(Nothing())));
 
@@ -2754,7 +2758,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
                     Invoke(&dockerContainerizer,
                            &MockDockerContainerizer::_launch)));
 
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(Return(Failure("some error from fetch")));
 
   vector<TaskInfo> tasks;