You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:46:01 UTC

[3/4] mesos git commit: Added a cache to the Fetcher.

Added a cache to the Fetcher.

Almost all of the functionality in epic MESOS-336. Downloaded files
from CommandInfo::URIs can now be cached in a cache directory
designated by a slave flag. This only happens when asked for by an
extra flag in the URI and is thus backwards-compatible. The cache has
a size limit also given by a new slave flag. Cache-resident files are
evicted as necessary to make space for newly fetched ones. Concurrent
attempts to cache the same URI leads to only one download. The fetcher
program remains external for safety reasons, but is now augmented with
more elaborate parameters packed into a JSON object to implement
specific fetch actions for all of the above. Additional testing
includes fetching from (mock) HDFS and coverage of the new features.

Review: https://reviews.apache.org/r/30774


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/edd35b05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/edd35b05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/edd35b05

Branch: refs/heads/master
Commit: edd35b050a736aa6f7e3e6939a6ee074df66954d
Parents: b16999a
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Thu May 21 19:34:46 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Jun 1 02:27:43 2015 -0700

----------------------------------------------------------------------
 docs/configuration.md                           |   18 +
 docs/fetcher-cache-internals.md                 |  115 ++
 docs/fetcher.md                                 |  255 ++++
 include/mesos/fetcher/fetcher.proto             |   43 +-
 include/mesos/mesos.proto                       |   17 +
 include/mesos/type_utils.hpp                    |   17 +
 src/Makefile.am                                 |    1 +
 src/hdfs/hdfs.hpp                               |   35 +
 src/launcher/fetcher.cpp                        |  500 ++++---
 src/slave/constants.hpp                         |    3 +
 src/slave/containerizer/docker.cpp              |    9 +-
 src/slave/containerizer/docker.hpp              |    4 +-
 src/slave/containerizer/fetcher.cpp             | 1091 ++++++++++++--
 src/slave/containerizer/fetcher.hpp             |  335 ++++-
 src/slave/containerizer/mesos/containerizer.cpp |    7 +-
 src/slave/containerizer/mesos/containerizer.hpp |    3 +-
 src/slave/flags.cpp                             |   17 +
 src/slave/flags.hpp                             |    2 +
 src/slave/slave.cpp                             |   16 +-
 src/tests/docker_containerizer_tests.cpp        |   20 +-
 src/tests/fetcher_cache_tests.cpp               | 1359 ++++++++++++++++++
 src/tests/fetcher_tests.cpp                     |  487 ++-----
 src/tests/mesos.cpp                             |   43 +
 src/tests/mesos.hpp                             |   49 +
 24 files changed, 3665 insertions(+), 781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4aeb4ad..4e20913 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1231,6 +1231,24 @@ file:///path/to/file (where file contains one of the above)</code></pre>
   </tr>
   <tr>
     <td>
+      --fetcher_cache_size=VALUE
+    </td>
+    <td>
+      Size of the fetcher cache in Bytes.
+      (default: 2 GB)
+    </td>
+  </tr>
+  <tr>
+    <td>
+      --fetcher_cache_dir=VALUE
+    </td>
+    <td>
+      Parent directory for fetcher cache directories (one subdirectory per slave). By default this directory is held inside the work directory, so everything can be deleted or archived in one swoop, in particular during testing. However, a typical production scenario is to use a separate cache volume. First, it is not meant to be backed up. Second, you want to avoid that sandbox directories and the cache directory can interfere with each other in unpredictable ways by occupying shared space. So it is recommended to set the cache directory explicitly.
+      (default: /tmp/mesos/fetch)
+    </td>
+  </tr>
+  <tr>
+    <td>
       --work_dir=VALUE
     </td>
     <td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/fetcher-cache-internals.md
----------------------------------------------------------------------
diff --git a/docs/fetcher-cache-internals.md b/docs/fetcher-cache-internals.md
new file mode 100644
index 0000000..c696f15
--- /dev/null
+++ b/docs/fetcher-cache-internals.md
@@ -0,0 +1,115 @@
+---
+layout: documentation
+---
+
+# Mesos Fetcher Cache Internals
+
+It assumed that readers of this document are well familiar with the contents of the overview and user guide of the Mesos fetcher in "fetcher.md". The present document makes direct references to notions defined in the former.
+
+## Design goals for the initial fetcher cache prototype:
+
+0. Direct fetching: Provide the pre-existing fetcher functionality (as in Mesos 0.22 and before) when caching is not explicitly requested.
+1. Program isolation: Preserve the approach to employ an external "mesos-fetcher" program to handle all (potentially very lengthy or blocking) content download operations.
+2. Cache effect: Significant lessen overall time spent on fetching in case of repetition of requests for the same URI. This holds for both sequential and concurrent repetition. The latter is the case when concurrently launched tasks on the same slave require overlapping URI sets.
+3. Cache space limit: Use a user-specified directory for cache storage and maintain a user-specified physical storage space limit for it. Evict older cache files as needed to fetch new cache content.
+4. Fallback strategy: Whenever downloading to or from the cache fails for any reason, fetching into the sandbox should still succeed somehow if at all possible.
+5. Slave recovery: Support slave recovery.
+
+For future releases, we foresee additional features:
+1. Automatic refreshing of cache content when a URI's content has changed.
+2. Prefetching URIs for subsequent tasks. Prefetching can run in parallel with task execution.
+
+## How the fetcher cache works
+
+In this section we look deeper into the implementation of design goals #1, #2, #3. The others are sufficiently covered in the user guide.
+
+### Fetcher process and mesos-fetcher
+
+The fetcher mechanism consists of two separate entities:
+
+1. The fetcher process included in the slave program. There is exactly one instance of this per slave.
+2. The separate mesos-fetcher program. There is one invocation of this per fetch request from the slave to the fetcher process.
+
+The fetcher process performs internal bookkeeping of what is in the cache and what is not. As needed, it invokes the mesos-fetcher program to download resources from URIs to the cache or directly to sandbox directories, and to copy resources from the cache to a sandbox directory.
+
+All decision making "intelligence" is situated in the fetcher process and the mesos-fetcher program is a rather simple helper program. Except for cache files, there is no persistent state at all in the entire fetcher system. This greatly simplifies dealing with all the inherent intricacies and races involved in concurrent fetching with caching.
+
+The mesos-fetcher program takes straight forward per-URI commands and executes these. It has three possible modes of operation for any given URI:
+
+1. Bypass the cache and fetch directly into the specified sandbox directory.
+2. Fetch into the cache and then copy the resulting cache file into the sandbox directory.
+3. Do not download anything. Copy (or extract) a resource from the cache into the sandbox directory.
+
+Besides minor complications such as archive extraction and execution rights settings, this already sums up all it does.
+
+Based on this setup, the main program flow in the fetcher process is concerned with assembling a list of parameters to the mesos-fetcher program that describe items to be fetched. This figure illustrates the high-level collaboration of the fetcher process with mesos-fetcher program runs. It also depicts the next level of detail of the fetcher process, which will be described in the following section.
+
+![Fetcher Separation of Labor](images/fetch_components.jpg?raw=true)
+
+
+### Cache state representation and manipulation
+
+The fetcher process uses a private instance of class Cache to represent what URIs are cached, where the respective cache files are, what stage of processing they are in, and so on.
+
+The main data structure to hold all this information is a hashmap from URI/user combinations to Cache::Entry objects, which each contain information about an individual cache file on disk. These objects are referenced by shared_ptr, because they can be addressed by multiple callbacks on behalf of concurrent fetch attempts while also being held in the hashmap.
+
+A cache entry corresponds directly to a cache file on disk throughout the entire life time of the latter, including before and after its existence. It holds all pertinent state to inform about the phase and results of fetching the corresponding URI.
+
+This figure illustrates the different states which a cache entry can be in.
+
+![Fetcher Cache State](images/fetch_state.jpg?raw=true)
+
+While a cache entry is referenced it cannot be evicted by a the current or any other concurrent fetch attempt in order to make space for a download of a new cache file.
+
+The two blue states are essentially the same: no cache file exists. The two green disk states on the right are also the same.
+
+The figure only depicts what happens from the point of view of one isolated fetch run. Any given cache entry can be referenced simultaniously by another concurrent fetch run. It must not be evicted as long as it is referenced by any fetching activity. We implement this by reference counting. Every cache entry has a reference count field that gets incremented at the beginning of its use by a fetch run and decremented at its end. The latter must happen no matter whether the run has been successful or whether there has been an error. Increments happen when:
+- A new cache entry is created. It is immediately referenced.
+- An existing cache entry's file download is going to be waited for.
+- An existing cache entry has a resident cache file that is going to be retrieved.
+
+Every increment is recorded in a list. At the very end of the fetch procedure, no matter what its outcome is, each entry in the list gets its reference count decremented.
+
+(Currently, we are even leaving reference counts for cache entries for which we fall back to bypassing the cache untouched until the end of the fetch procedure. This may be unnecessary, but it is safe. It is also supposedly rare, because fallbacks only occur to mitigate unexpected error situations. A future version may optimize this behavior.)
+
+### The per-URI control flow
+
+As menitoned above, the fetcher process' main control flow concerns sorting out what to do with each URI presented to it in a fetch request. An overview of the ensuing control flow for a given URI is depicted in this figure.
+
+![Determining Fetcher Actions](images/fetch_flow.jpg?raw=true)
+
+After going through this procedure for each URI, the fetcher process assembles the gathered list of per-URI actions into a JSON object (FetcherInfo), which is passed to the mesos-fetcher program in an environment variable. The possible fetch actions for a URI are shown at the bottom of the flow chart. After they are determined, the fetcher process invokes mesos-fetcher.
+
+The implementation is oriented at this control flow but its code structure cannot match it directly, because some of these branches must span multiple libprocess continuations. There are two layers of futures, one for each of these phases.
+
+1.  Before making fetcher cache items.
+- a) Wait for concurrent downloads for pre-existing cache entries
+- b) Wait for size fetching combined and then space reservation for new cache entries.
+
+2. After making fetcher cache items and running mesos-fetcher.
+- Complete new cache items with success/failure, which as an important side-effect informs concurrent fetch runs’ futures in phase 1/a.
+
+The futures for phase 1 are not shared outside one fetch run. They exclusively guard asynchronous operations for the same fetch run. Their type parameter does not really matter. But each needs to correspond to one URI and eventual fetch item somehow. Multiple variants have been proposed for this. The complexity remains about the same.
+
+The futures for phase 2 need to be part of the cache entries, because they are shared between concurrent fetch runs.
+
+Some time between phase 1 and 2, the fallback strategy needs to be applied where indicated: when a future from phase 1 has failed for any reason, we fall back on bypassing the cache.
+
+Besides, everything touched in 1/a and 1/b needs to be prevented from being cache-evicted until the end. One can in principle release cache entries right after they fail, but this requires more complexity and is harder to prove correct.
+
+
+### Cache eviction
+
+![Before eviction](images/fetch_evict1.jpg?raw=true)
+
+The resources named "A" and "B" have been fetched with caching into sandbox 1 and 2 below. In the course of this, two cache entries have been created and two files have been downloaded into the cache and named "1" and "2". (Cache file names have unique names that comprise serial numbers.)
+
+The next figure illustrates the state after fetching a different cached URI into sandbox 3, which in this case requires evicting a cache-resident file and its entry. Steps:
+1. Remove the cache entry for "A" from the fetcher process' cache entry table. Its faded depiction is supposed to indicate this. This immediately makes it appear as if the URI has never been cached, even though the cache file is still around.
+2. Proceed with fetching "C". This creates a new cache file, which has a different unique name. (The fetcher process remembers in its cache entry which file name belongs to which URI.)
+
+![After eviction](images/fetch_evict2.jpg?raw=true)
+
+The next figure then shows what happens if the first URI is fetched once again. Here we also assume the cache being so filled up that eviction is necessary and this time the entry and file for "B" are the victims.
+
+![After another eviction](images/fetch_evict3.jpg?raw=true)

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/fetcher.md
----------------------------------------------------------------------
diff --git a/docs/fetcher.md b/docs/fetcher.md
new file mode 100644
index 0000000..638f530
--- /dev/null
+++ b/docs/fetcher.md
@@ -0,0 +1,255 @@
+---
+layout: documentation
+---
+
+# Mesos Fetcher
+
+Experimental support for the Mesos fetcher _cache_ is introduced in
+Mesos 0.23.0.
+
+In this context we loosely regard the term "downloading" as to include copying
+from local file systems.
+
+## What is the Mesos fetcher?
+
+The Mesos fetcher is a mechanism to download resources into the sandbox
+directory of a task in preparation of running the task. As part of a TaskInfo
+message, the framework ordering the task's execution provides a list of
+CommandInfo::URI protobuf values, which becomes the input to the Mesos fetcher.
+
+By default, each requested URI is downloaded directly into the sandbox directory
+and repeated requests for the same URI leads to downloading another copy of the
+same resource. Alternatively, the fetcher can be instructed to cache URI
+downloads in a dedicated directory for reuse by subsequent downloads.
+
+The Mesos fetcher mechanism comprises of these two parts:
+
+1. The slave-internal Fetcher Process (in terms of libprocess) that controls and
+coordinates all fetch actions. Every slave instance has exactly one internal
+fetcher instance that is used by every kind of containerizer (except the
+external containerizer variant, which is responsible for its own approach to
+fetching).
+
+2. The external program "mesos-fetcher" that is invoked by the former. It
+performs all network and disk operations except file deletions and file size
+queries for cache-internal bookkeeping. It is run as an external OS process in
+order to shield the slave process from I/O-related hazards. It takes
+instructions in form of an environment variable containing a JSON object with
+detailed fetch action descriptions.
+
+## The fetch procedure
+
+Frameworks launch tasks by calling the scheduler driver method launchTasks(),
+passing CommandInfo protobuf structures as arguments. This type of structure
+specifies (among other things) a command and a list of URIs that need to be
+"fetched" into the sandbox directory on the the slave node as a precondition for
+task execution. Hence, when the slave receives a request go launch a task, it
+calls upon its fetcher, first, to provision the specified resources into the
+sandbox directory. If fetching fails, the task is not started and the reported
+task status is TASK_FAILED.
+
+All URIs requested for a given task are fetched sequentially in a single
+invocation of mesos-fetcher. Here, avoiding download concurrency reduces the
+risk of bandwidth issues somewhat. However, multiple fetch operations can be
+active concurrently due to multiple task launch requests.
+
+### The URI protobuf structure
+
+Before mesos-fetcher is started, the specific fetch actions to be performed for
+each URI are determined based on the following protobuf structure. (See
+"include/mesos/mesos.proto" for more details.)
+
+    message CommandInfo {
+      message URI {
+        required string value = 1;
+        optional bool executable = 2;
+        optional bool extract = 3 [default = true];
+        optional bool cache = 4;
+      }
+      ...
+      optional string user = 5;
+    }
+
+The field "value" contains the URI.
+
+If the "executable" field is "true", the "extract" field is ignored and
+has no effect.
+
+If the "cache" field is true, the fetcher cache is to be used for the URI.
+
+### Specifying a user name
+
+The framework may pass along a user name that becomes a fetch parameter. This
+causes its executors and tasks to run under a specific user. However, if the
+"user" field in the CommandInfo structure is specified, it takes precedence for
+the affected task.
+
+If a user name is specified either way, the fetcher first validates that it is
+in fact a valid user name on the slave. If it is not, fetching fails right here.
+Otherwise, the sandbox directory is assigned to the specified user as owner
+(using chown) at the end of the fetch procedure, before task execution begins.
+
+The user name in play has an important effect on caching.  Caching is managed on
+a per-user base, i.e. the combination of user name and "uri" uniquely
+identifies a cacheable fetch result. If no user name has been specified, this
+counts for the cache as a separate user, too. Thus cache files for each valid
+user are segregated from all others, including those without a specified user.
+
+This means that the exact same URI will be downloaded and cached multiple times
+if different users are indicated.
+
+### Executable fetch results
+
+By default, fetched files are not executable.
+
+If the field "executable" is set to "true", the fetch result will be changed to
+be executable (by "chmod") for every user. This happens at the end of the fetch
+procedure, in the sandbox directory only. It does not affect any cache file.
+
+### Archive extraction
+
+If the "extract" field is "true", which is the default, then files with
+extensions that hint at packed or compressed archives (".zip", ".tar", et.al.)
+are unpacked in the sandbox directory.
+
+In case the cache is bypassed, both the archive and the unpacked results will be
+found together in the sandbox. In case a cache file is unpacked, only the
+extraction result will be found in the sandbox.
+
+### Bypassing the cache
+
+By default, the URI field "cache" is not present. If this is the case or its
+value is "false" the fetcher downloads directly into the sandbox directory.
+
+The same also happens dynamically as a fallback strategy if anything goes wrong
+when preparing a fetch operation that involves the cache. In this case, a
+warning message is logged. Possible fallback conditions are:
+
+- The server offering the URI does not respond or reports an error.
+- The URI's download size could not be determined.
+- There is not enough space in the cache, even after attempting to evict files.
+
+### Fetching through the cache
+
+If the URI's "cache" field has the value "true", then the fetcher cache is in
+effect. If a URI is encountered for the first time (for the same user), it is
+first downloaded into the cache, then copied to the sandbox directory from
+there. If the same URI is encountered again, and a corresponding cache file is
+resident in the cache or still en route into the cache, then downloading is
+omitted and the fetcher proceeds directly to copying from the cache. Competing
+requests for the same URI simply wait upon completion of the first request that
+occurs. Thus every URI is downloaded at most once (per user) as long as it is
+cached.
+
+Every cache file stays resident for an unspecified amount of time and can be
+removed at the fetcher's discretion at any moment, except while it is in direct
+use:
+
+- It is still being downloaded by this fetch procedure.
+- It is still being downloaded by a concurrent fetch procedure for a different
+  task.
+- It is being copied or extracted from the cache.
+
+Once a cache file has been removed, the related URI will thereafter be treated
+as described above for the first encounter.
+
+Unfortunately, there is no mechanism to refresh a cache entry in the current
+experimental version of the fetcher cache. A future feature may force updates
+based on checksum queries to the URI.
+
+Recommended practice for now:
+
+The framework should start using a fresh unique URI whenever the resource's
+content has changed.
+
+### Determining resource sizes
+
+Before downloading a resource to the cache, the fetcher first determines the
+size of the expected resource. It uses these methods depending on the nature of
+the URI.
+
+- Local file sizes are probed with systems calls (that follow symbolic links).
+- HTTP/HTTPS URIs are queried for the "content-length" field in the header. This
+  is performed by CURL. The reported asset size must be greater than zero or
+  the URI is deemed invalid.
+- FTP/FTPS is not supported at the time of writing.
+- Everything else is queried by the local HDFS client.
+
+If any of this reports an error, the fetcher then falls back on bypassing the
+cache as described above.
+
+WARNING: Only URIs for which download sizes can be queried up front and for
+which accurate sizes are reported reliably are eligible for any fetcher cache
+involvement. If actual cache file sizes exceed the physical capacity of the
+cache directory in any way, all further slave behavior is completely
+unspecified. Do not use any cache feature with any URI for which you have any
+doubts!
+
+To mitigate this problem, cache files that have been found to be larger than
+expected are deleted immediately after downloading and and delivering the
+requested content to the sandbox. Thus exceeding total capacity at least
+does not accumulate over subsequent fetcher runs.
+
+If you know for sure that size aberrations are within certain limits you can
+specify a cache directory size that is sufficiently smaller than your actual
+physical volume and fetching should work.
+
+In case of cache files that are smaller then expected, the cache will
+dynamically adjust its own bookkeeping according to actual sizes.
+
+### Cache eviction
+
+After determining the prospective size of a cache file and before downloading
+it, the cache attempts to ensure that at least as much space as is needed for
+this file is available and can be written into. If this is immediately the case,
+the requested amount of space is simply marked as reserved. Otherwise, missing
+space is freed up by "cache eviction". This means that the cache removes files
+at its own discretion until the given space target is met or exceeded.
+
+The eviction process fails if too many files are in use and therefore not
+evictable or if the cache is simply too small. Either way, the fetcher then
+falls back on bypassing the cache for the given URI as described above.
+
+If multiple evictions happen concurrently, each of them is pursuing its own
+separate space goals. However, leftover freed up space from one effort is
+automatically awarded to others.
+
+## Slave flags
+
+It is highly recommended to set these flags explicitly to values other than
+their defaults or to not use the fetcher cache in production.
+
+- "fetcher_cache_size", default value: enough for testing.
+- "fetcher_cache_dir", default value: somewhere inside the directory specified
+  by the "work_dir" flag, which is OK for testing.
+
+Recommended practice:
+
+- Use a separate volume as fetcher cache. Do not specify a directory as fetcher
+  cache directory that competes with any other contributor for the underlying
+  volume's space.
+- Set the cache directory size flag of the slave to less than your actual cache
+  volume's physical size. Use a safety margin, especially if you do not know
+  for sure if all frameworks are going to be compliant.
+
+Ultimate remedy:
+
+You can disable the fetcher cache entirely on each slave by setting its
+"fetcher_cache_size" flag to zero bytes.
+
+## Future Features
+The following features would be relatively easy to implement additionally.
+
+- Perform cache updates based on resource check sums. For example, query the md5
+  field in HTTP headers to determine when a resource at a URL has changed.
+- Respect HTTP cache-control directives.
+- Enable caching for ftp/ftps.
+- Use symbolic links or bind mounts to project cached resources into the
+  sandbox, read-only.
+- Have a choice whether to copy the extracted archive into the sandbox.
+- Have a choice whether to delete the archive after extraction bypassing the
+  cache.
+- Make the segregation of cache files by user optional.
+- Extract content while downloading when bypassing the cache.
+- Prefetch resources for subsequent tasks. This can happen concurrently with
+  running the present task, right after fetching its own resources.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/fetcher/fetcher.proto
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
index 311af9a..1b2f493 100644
--- a/include/mesos/fetcher/fetcher.proto
+++ b/include/mesos/fetcher/fetcher.proto
@@ -23,14 +23,45 @@ package mesos.fetcher;
 option java_package = "org.apache.mesos.fetcher";
 option java_outer_classname = "Protos";
 
-
 /**
  * Encodes the fetcher environment variable sent to the external fetcher
- * program.
+ * program. See also "docs/fetcher.md" and
+ * "docs/fetcher-cache-internals.md". Note that part of these
+ * definitions are quoted verbatim in "docs/fetcher.md" and must be
+ * updated there whenever they change here.
  */
 message FetcherInfo {
-  required CommandInfo command_info = 1;
-  required string work_directory = 2;
-  optional string user = 3;
-  optional string frameworks_home = 4;
+  message Item
+  {
+    // What action the fetcher program is supposed to perform for a
+    // given URI.
+    enum Action
+    {
+      // Bypass the cache, download directly into the sandbox directory.
+      BYPASS_CACHE = 0;
+
+      // Download a resource at the given URI to the fetcher's file cache.
+      // Then retrieve the resource from the cache into the sandbox
+      // directory.
+      DOWNLOAD_AND_CACHE = 1;
+
+      // Copy or extract the resource from the cache, without downloading.
+      RETRIEVE_FROM_CACHE = 2;
+    }
+
+    required CommandInfo.URI uri = 1;
+    required Action action = 2;
+    optional string cache_filename = 3;
+  }
+
+  // Must be present when fetching into the sandbox in any way.
+  required string sandbox_directory = 1;
+
+  // Optional so that fetch requests that only use BYPASS_CACHE do not
+  // need to specify an unused cache directory.
+  optional string cache_directory = 2;
+
+  repeated Item items = 3;
+  optional string user = 4;
+  optional string frameworks_home = 5;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index a668889..5cf81e2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -232,7 +232,24 @@ message CommandInfo {
   message URI {
     required string value = 1;
     optional bool executable = 2;
+
+    // In case the fetched file is recognized as an archive, extract
+    // its contents into the sandbox. Note that a cached archive is
+    // not copied from the cache to the sandbox in case extraction
+    // originates from an archive in the cache.
     optional bool extract = 3 [default = true];
+
+    // If this field is "true", the fetcher cache will be used. If not,
+    // fetching bypasses the cache and downloads directly into the
+    // sandbox directory, no matter whether a suitable cache file is
+    // available or not. The former directs the fetcher to download to
+    // the file cache, then copy from there to the sandbox. Subsequent
+    // fetch attempts with the same URI will omit downloading and copy
+    // from the cache as long as the file is resident there. Cache files
+    // may get evicted at any time, which then leads to renewed
+    // downloading. See also "docs/fetcher.md" and
+    // "docs/fetcher-cache-internals.md".
+    optional bool cache = 4;
   }
 
   // Describes a container.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 837be6f..52380c2 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -188,6 +188,23 @@ inline bool operator < (const TaskID& left, const TaskID& right)
 }
 
 
+inline std::size_t hash_value(const CommandInfo::URI& uri)
+{
+  size_t seed = 0;
+
+  if (uri.extract()) {
+    seed += 11;
+  }
+
+  if (uri.executable()) {
+    seed += 2003;
+  }
+
+  boost::hash_combine(seed, uri.value());
+  return seed;
+}
+
+
 inline std::size_t hash_value(const ContainerID& containerId)
 {
   size_t seed = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 9d1f0d5..a5a7306 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1418,6 +1418,7 @@ mesos_tests_SOURCES =				\
   tests/external_containerizer_test.cpp		\
   tests/health_check_tests.cpp                  \
   tests/fault_tolerance_tests.cpp		\
+  tests/fetcher_cache_tests.cpp            \
   tests/fetcher_tests.cpp                       \
   tests/files_tests.cpp				\
   tests/flags.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 968545d..0b0312c 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -2,6 +2,7 @@
 #define __HDFS_HPP__
 
 #include <sstream>
+#include <vector>
 
 #include <stout/check.hpp>
 #include <stout/error.hpp>
@@ -78,6 +79,40 @@ struct HDFS
     return status.get() == 0;
   }
 
+  Try<Bytes> du(std::string path)
+  {
+    // Make sure 'path' starts with a '/'.
+    path = path::join("", path);
+
+    Try<std::string> command = strings::format(
+        "%s fs -du -h '%s'", hadoop, path);
+
+    CHECK_SOME(command);
+
+    std::ostringstream output;
+
+    Try<int> status = os::shell(&output, command.get() + " 2>&1");
+
+    if (status.isError()) {
+      return Error("HDFS du failed: " + status.error());
+    }
+
+    const std::vector<std::string>& s = strings::split(output.str(), " ");
+    if (s.size() != 2) {
+      return Error("HDFS du returned an unexpected number of results: '" +
+                   output.str() + "'");
+    }
+
+    Result<size_t> size = numify<size_t>(s[0]);
+    if (size.isError()) {
+      return Error("HDFS du returned unexpected format: " + size.error());
+    } else if (size.isNone()) {
+      return Error("HDFS du returned unexpected format");
+    }
+
+    return Bytes(size.get());
+  }
+
   Try<Nothing> rm(std::string path)
   {
     // Make sure 'to' starts with a '/'.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 796526f..c32106f 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -34,226 +34,344 @@
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
 using namespace mesos;
 using namespace mesos::internal;
 
 using mesos::fetcher::FetcherInfo;
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::string;
+using mesos::internal::slave::Fetcher;
 
+using std::string;
 
-const char FILE_URI_PREFIX[] = "file://";
-const char FILE_URI_LOCALHOST[] = "file://localhost";
 
-// Try to extract filename into directory. If filename is recognized as an
-// archive it will be extracted and true returned; if not recognized then false
-// will be returned. An Error is returned if the extraction command fails.
-Try<bool> extract(const string& filename, const string& directory)
+// Try to extract sourcePath into directory. If sourcePath is
+// recognized as an archive it will be extracted and true returned;
+// if not recognized then false will be returned. An Error is
+// returned if the extraction command fails.
+static Try<bool> extract(
+    const string& sourcePath,
+    const string& destinationDirectory)
 {
   string command;
   // Extract any .tgz, tar.gz, tar.bz2 or zip files.
-  if (strings::endsWith(filename, ".tgz") ||
-      strings::endsWith(filename, ".tar.gz") ||
-      strings::endsWith(filename, ".tbz2") ||
-      strings::endsWith(filename, ".tar.bz2") ||
-      strings::endsWith(filename, ".txz") ||
-      strings::endsWith(filename, ".tar.xz")) {
-    command = "tar -C '" + directory + "' -xf";
-  } else if (strings::endsWith(filename, ".zip")) {
-    command = "unzip -d '" + directory + "'";
+  if (strings::endsWith(sourcePath, ".tgz") ||
+      strings::endsWith(sourcePath, ".tar.gz") ||
+      strings::endsWith(sourcePath, ".tbz2") ||
+      strings::endsWith(sourcePath, ".tar.bz2") ||
+      strings::endsWith(sourcePath, ".txz") ||
+      strings::endsWith(sourcePath, ".tar.xz")) {
+    command = "tar -C '" + destinationDirectory + "' -xf";
+  } else if (strings::endsWith(sourcePath, ".zip")) {
+    command = "unzip -d '" + destinationDirectory + "'";
   } else {
     return false;
   }
 
-  command += " '" + filename + "'";
+  command += " '" + sourcePath + "'";
+
+  LOG(INFO) << "Extracting with command: " << command;
+
   int status = os::system(command);
   if (status != 0) {
     return Error("Failed to extract: command " + command +
                  " exited with status: " + stringify(status));
   }
 
-  LOG(INFO) << "Extracted resource '" << filename
-            << "' into '" << directory << "'";
+  LOG(INFO) << "Extracted '" << sourcePath << "' into '"
+            << destinationDirectory << "'";
 
   return true;
 }
 
 
 // Attempt to get the uri using the hadoop client.
-Try<string> fetchWithHadoopClient(
-    const string& uri,
-    const string& directory)
+static Try<string> downloadWithHadoopClient(
+    const string& sourceUri,
+    const string& destinationPath)
 {
   HDFS hdfs;
   Try<bool> available = hdfs.available();
 
   if (available.isError() || !available.get()) {
-    LOG(INFO) << "Hadoop Client not available, "
-              << "skipping fetch with Hadoop Client";
-    return Error("Hadoop Client unavailable");
+    return Error("Skipping fetch with Hadoop Client as"
+                 " Hadoop Client not available: " + available.error());
   }
 
-  LOG(INFO) << "Fetching URI '" << uri << "' using Hadoop Client";
+  LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
+            << "' to '" << destinationPath << "'";
 
-  Try<string> base = os::basename(uri);
-  if (base.isError()) {
-    LOG(ERROR) << "Invalid basename for URI: " << base.error();
-    return Error("Invalid basename for URI");
+  Try<Nothing> result = hdfs.copyToLocal(sourceUri, destinationPath);
+  if (result.isError()) {
+    return Error("HDFS copyToLocal failed: " + result.error());
   }
 
-  string path = path::join(directory, base.get());
+  return destinationPath;
+}
 
-  LOG(INFO) << "Downloading resource from '" << uri  << "' to '" << path << "'";
 
-  Try<Nothing> result = hdfs.copyToLocal(uri, path);
-  if (result.isError()) {
-    LOG(ERROR) << "HDFS copyToLocal failed: " << result.error();
-    return Error(result.error());
+static Try<string> downloadWithNet(
+    const string& sourceUri,
+    const string& destinationPath)
+{
+  LOG(INFO) <<  "Downloading resource from '" << sourceUri
+            << "' to '" << destinationPath << "'";
+
+  Try<int> code = net::download(sourceUri, destinationPath);
+  if (code.isError()) {
+    return Error("Error downloading resource: " + code.error());
+  } else if (code.get() != 200) {
+    return Error("Error downloading resource, received HTTP/FTP return code " +
+                 stringify(code.get()));
   }
 
-  return path;
+  return destinationPath;
 }
 
 
-Try<string> fetchWithNet(
-    const string& uri,
-    const string& directory)
+static Try<string> copyFile(
+    const string& sourcePath,
+    const string& destinationPath)
 {
-  LOG(INFO) << "Fetching URI '" << uri << "' with os::net";
+  const string command = "cp '" + sourcePath + "' '" + destinationPath + "'";
 
-  string path = uri.substr(uri.find("://") + 3);
-  if (path.find("/") == string::npos ||
-      path.size() <= path.find("/") + 1) {
-    LOG(ERROR) << "Malformed URL (missing path)";
-    return Error("Malformed URI");
+  LOG(INFO) << "Copying resource with command:" << command;
+
+  int status = os::system(command);
+  if (status != 0) {
+    return Error("Failed to copy with command '" + command +
+                 "', exit status: " + stringify(status));
   }
 
-  path = path::join(directory, path.substr(path.find_last_of("/") + 1));
-  LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'";
-  Try<int> code = net::download(uri, path);
-  if (code.isError()) {
-    LOG(ERROR) << "Error downloading resource: " << code.error().c_str();
-    return Error("Fetch of URI failed (" + code.error() + ")");
-  } else if (code.get() != 200) {
-    LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code "
-    << code.get();
-    return Error("HTTP/FTP error (" + stringify(code.get()) + ")");
+  return destinationPath;
+}
+
+
+static Try<string> download(
+    const string& sourceUri,
+    const string& destinationPath,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching URI '" << sourceUri << "'";
+  Try<Nothing> validation = Fetcher::validateUri(sourceUri);
+  if (validation.isError()) {
+    return Error(validation.error());
+  }
+
+  // 1. Try to fetch using a local copy.
+  // We regard as local: "file://" or the absense of any URI scheme.
+  Result<string> sourcePath =
+    Fetcher::uriToLocalPath(sourceUri, frameworksHome);
+
+  if (sourcePath.isError()) {
+    return Error(sourcePath.error());
+  } else if (sourcePath.isSome()) {
+    return copyFile(sourcePath.get(), destinationPath);
   }
 
-  return path;
+  // 2. Try to fetch URI using os::net / libcurl implementation.
+  // We consider http, https, ftp, ftps compatible with libcurl.
+  if (Fetcher::isNetUri(sourceUri)) {
+     return downloadWithNet(sourceUri, destinationPath);
+  }
+
+  // 3. Try to fetch the URI using hadoop client.
+  // We use the hadoop client to fetch any URIs that are not
+  // handled by other fetchers(local / os::net). These URIs may be
+  // `hdfs://` URIs or any other URI that has been configured (and
+  // hence handled) in the hadoop client. This allows mesos to
+  // externalize the handling of previously unknown resource
+  // endpoints without the need to support them natively.
+  // Note: Hadoop Client is not a hard dependency for running mesos.
+  // This allows users to get mesos up and running without a
+  // hadoop_home or the hadoop client setup but in case we reach
+  // this part and don't have one configured, the fetch would fail
+  // and log an appropriate error.
+  return downloadWithHadoopClient(sourceUri, destinationPath);
 }
 
 
-Try<string> fetchWithLocalCopy(
-    const string& uri,
-    const string& directory,
-    const Option<std::string>& frameworksHome)
+// TODO(bernd-mesos): Refactor this into stout so that we can more easily
+// chmod an exectuable. For example, we could define some static flags
+// so that someone can do: os::chmod(path, EXECUTABLE_CHMOD_FLAGS).
+static Try<string> chmodExecutable(const string& filePath)
 {
-    string local = uri;
-    bool fileUri = false;
-    if (strings::startsWith(local, string(FILE_URI_LOCALHOST))) {
-        local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1);
-        fileUri = true;
-    } else if (strings::startsWith(local, string(FILE_URI_PREFIX))) {
-        local = local.substr(sizeof(FILE_URI_PREFIX) - 1);
-        fileUri = true;
-    }
+  Try<Nothing> chmod = os::chmod(
+      filePath, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
+  if (chmod.isError()) {
+    return Error("Failed to chmod executable '" +
+                 filePath + "': " + chmod.error());
+  }
 
-    if (fileUri && !strings::startsWith(local, "/")) {
-        return Error("File URI only supports absolute paths");
-    }
+  return filePath;
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchBypassingCache(
+    const CommandInfo::URI& uri,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching directly into the sandbox directory";
+
+  Try<string> basename = Fetcher::basename(uri.value());
+  if (basename.isError()) {
+    return Error("Failed to determine the basename of the URI '" +
+                 uri.value() + "' with error: " + basename.error());
+  }
+
+  string path = path::join(sandboxDirectory, basename.get());
+
+  Try<string> downloaded = download(uri.value(), path, frameworksHome);
+  if (downloaded.isError()) {
+    return Error(downloaded.error());
+  }
 
-    if (local.find_first_of("/") != 0) {
-        // We got a non-Hadoop and non-absolute path.
-        if (frameworksHome.isSome()) {
-            local = path::join(frameworksHome.get(), local);
-            LOG(INFO) << "Prepended the slave's frameworks_home flag value "
-                      << " to relative path, making it: '" << local << "'";
-        } else {
-            LOG(ERROR) << "A relative path was passed for the resource but the "
-                       << "slave's frameworks_home flag is not set; "
-                       << "please either specify this slave configuration "
-                       << "option or avoid using a relative path";
-            return Error("Could not resolve relative URI");
-        }
+  if (uri.executable()) {
+    return chmodExecutable(downloaded.get());
+  } else if (uri.extract()) {
+    Try<bool> extracted = extract(path, sandboxDirectory);
+    if (extracted.isError()) {
+      return Error(extracted.error());
+    } else {
+      LOG(WARNING) << "Copying instead of extracting resource from URI with "
+                   << "'extract' flag, because it does not seem to be an "
+                   << "archive: " << uri.value();
     }
+  }
+
+  return downloaded;
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchFromCache(
+    const FetcherInfo::Item& item,
+    const string& cacheDirectory,
+    const string& sandboxDirectory)
+{
+  LOG(INFO) << "Fetching from cache";
+
+  Try<string> basename = Fetcher::basename(item.uri().value());
+  if (basename.isError()) {
+    return Error(basename.error());
+  }
+
+  string destinationPath = path::join(sandboxDirectory, basename.get());
 
-    Try<string> base = os::basename(local);
-    if (base.isError()) {
-        LOG(ERROR) << base.error();
-        return Error("Fetch of URI failed");
+  string sourcePath = path::join(cacheDirectory, item.cache_filename());
+
+  if (item.uri().executable()) {
+    Try<string> copied = copyFile(sourcePath, destinationPath);
+    if (copied.isError()) {
+      return Error(copied.error());
     }
 
-    // Copy the resource to the directory.
-    string path = path::join(directory, base.get());
-    std::ostringstream command;
-    command << "cp '" << local << "' '" << path << "'";
-    LOG(INFO) << "Copying resource from '" << local
-              << "' to '" << directory << "'";
-
-    int status = os::system(command.str());
-    if (status != 0) {
-        LOG(ERROR) << "Failed to copy '" << local
-                   << "' : Exit status " << status;
-        return Error("Local copy failed");
+    return chmodExecutable(copied.get());
+  } else if (item.uri().extract()) {
+    Try<bool> extracted = extract(sourcePath, sandboxDirectory);
+    if (extracted.isError()) {
+      return Error(extracted.error());
+    } else if (extracted.get()) {
+      return sandboxDirectory;
+    } else {
+      LOG(WARNING) << "Copying instead of extracting resource from URI with "
+                   << "'extract' flag, because it does not seem to be an "
+                   << "archive: " << item.uri().value();
     }
+  }
 
-    return path;
+  return copyFile(sourcePath, destinationPath);
 }
 
 
-// Fetch URI into directory.
-Try<string> fetch(
-    const string& uri,
-    const string& directory,
-    const Option<std::string>& frameworksHome)
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchThroughCache(
+    const FetcherInfo::Item& item,
+    const Option<string>& cacheDirectory,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
 {
-    LOG(INFO) << "Fetching URI '" << uri << "'";
-    // Some checks to make sure using the URI value in shell commands
-    // is safe. TODO(benh): These should be pushed into the scheduler
-    // driver and reported to the user.
-    if (uri.find_first_of('\\') != string::npos ||
-        uri.find_first_of('\'') != string::npos ||
-        uri.find_first_of('\0') != string::npos) {
-        LOG(ERROR) << "URI contains illegal characters, refusing to fetch";
-        return Error("Illegal characters in URI");
-    }
+  if (cacheDirectory.isNone() || cacheDirectory.get().empty()) {
+    return Error("Cache directory not specified");
+  }
 
-    // 1. Try to fetch using a local copy.
-    // We consider file:// or no scheme uri are considered local uri.
-    if (strings::startsWith(uri, "file://") ||
-        uri.find("://") == string::npos) {
-      return fetchWithLocalCopy(uri, directory, frameworksHome);
+  if (!item.has_cache_filename() || item.cache_filename().empty()) {
+    // This should never happen if this program is used by the Mesos
+    // slave and could then be a CHECK. But other uses are possible.
+    return Error("No cache file name for: " + item.uri().value());
+  }
+
+  if (item.action() != FetcherInfo::Item::RETRIEVE_FROM_CACHE) {
+    CHECK_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE, item.action())
+      << "Unexpected fetcher action selector";
+
+    LOG(INFO) << "Downloading into cache";
+
+    Try<Nothing> mkdir = os::mkdir(cacheDirectory.get());
+    if (mkdir.isError()) {
+      return Error("Failed to create fetcher cache directory '" +
+                   cacheDirectory.get() + "': " + mkdir.error());
     }
 
-    // 2. Try to fetch URI using os::net / libcurl implementation.
-    // We consider http, https, ftp, ftps compatible with libcurl
-    if (strings::startsWith(uri, "http://") ||
-               strings::startsWith(uri, "https://") ||
-               strings::startsWith(uri, "ftp://") ||
-               strings::startsWith(uri, "ftps://")) {
-      return fetchWithNet(uri, directory);
+    Try<string> downloaded = download(
+        item.uri().value(),
+        path::join(cacheDirectory.get(), item.cache_filename()),
+        frameworksHome);
+
+    if (downloaded.isError()) {
+      return Error(downloaded.error());
     }
+  }
+
+  return fetchFromCache(item, cacheDirectory.get(), sandboxDirectory);
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetch(
+    const FetcherInfo::Item& item,
+    const Option<string>& cacheDirectory,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
+
+  if (item.action() == FetcherInfo::Item::BYPASS_CACHE) {
+    return fetchBypassingCache(
+        item.uri(),
+        sandboxDirectory,
+        frameworksHome);
+  }
 
-    // 3. Try to fetch the URI using hadoop client.
-    // We use the hadoop client to fetch any URIs that are not
-    // handled by other fetchers(local / os::net). These URIs may be
-    // `hdfs://` URIs or any other URI that has been configured (and
-    // hence handled) in the hadoop client. This allows mesos to
-    // externalize the handling of previously unknown resource
-    // endpoints without the need to support them natively.
-    // Note: Hadoop Client is not a hard dependency for running mesos.
-    // This allows users to get mesos up and running without a
-    // hadoop_home or the hadoop client setup but in case we reach
-    // this part and don't have one configured, the fetch would fail
-    // and log an appropriate error.
-    return fetchWithHadoopClient(uri, directory);
+  return fetchThroughCache(
+      item,
+      cacheDirectory,
+      sandboxDirectory,
+      frameworksHome);
 }
 
 
+// This "fetcher program" is invoked by the slave's fetcher actor
+// (Fetcher, FetcherProcess) to "fetch" URIs into the sandbox directory
+// of a given task. Its parameters are provided in the form of the env
+// var MESOS_FETCHER_INFO which contains a FetcherInfo (see
+// fetcher.proto) object formatted in JSON. These are set by the actor
+// to indicate what set of URIs to process and how to proceed with
+// each one. A URI can be downloaded directly to the task's sandbox
+// directory or it can be copied to a cache first or it can be reused
+// from the cache, avoiding downloading. All cache management and
+// bookkeeping is centralized in the slave's fetcher actor, which can
+// have multiple instances of this fetcher program running at any
+// given time. Exit code: 0 if entirely successful, otherwise 1.
 int main(int argc, char* argv[])
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -262,79 +380,59 @@ int main(int argc, char* argv[])
 
   Try<Nothing> load = flags.load("MESOS_", argc, argv);
 
-  if (load.isError()) {
-    cerr << load.error() << endl;
-    exit(1);
-  }
+  CHECK_SOME(load) << "Could not load flags: " << load.error();
 
   logging::initialize(argv[0], flags, true); // Catch signals.
 
   CHECK(os::hasenv("MESOS_FETCHER_INFO"))
     << "Missing MESOS_FETCHER_INFO environment variable";
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(os::getenv("MESOS_FETCHER_INFO"));
+  string jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
+  LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo;
 
-  if (parse.isError()) {
-    EXIT(1) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
-  }
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(jsonFetcherInfo);
+  CHECK_SOME(parse) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
 
-  Try<FetcherInfo> fetcherInfo = protobuf::parse<FetcherInfo>(parse.get());
-  if (fetcherInfo.isError()) {
-    EXIT(1) << "Failed to parse FetcherInfo: " << fetcherInfo.error();
-  }
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  CHECK_SOME(fetcherInfo)
+    << "Failed to parse FetcherInfo: " << fetcherInfo.error();
 
-  const CommandInfo& commandInfo = fetcherInfo.get().command_info();
+  CHECK(!fetcherInfo.get().sandbox_directory().empty())
+    << "Missing sandbox directory";
 
-  const string& directory = fetcherInfo.get().work_directory();
-  if (directory.empty()) {
-    EXIT(1) << "Missing work directory";
-  }
+  const string sandboxDirectory = fetcherInfo.get().sandbox_directory();
 
-  Option<std::string> user = None();
-  if (fetcherInfo.get().has_user()) {
-    user = fetcherInfo.get().user();
-  }
+  const Option<string> cacheDirectory =
+    fetcherInfo.get().has_cache_directory() ?
+      Option<string>::some(fetcherInfo.get().cache_directory()) :
+        Option<string>::none();
 
-  Option<std::string> frameworksHome = None();
-  if (fetcherInfo.get().has_frameworks_home()) {
-    frameworksHome = fetcherInfo.get().frameworks_home();
-  }
+  const Option<string> frameworksHome =
+    fetcherInfo.get().has_frameworks_home() ?
+      Option<string>::some(fetcherInfo.get().frameworks_home()) :
+        Option<string>::none();
 
   // Fetch each URI to a local file, chmod, then chown if a user is provided.
-  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
-    // Fetch the URI to a local file.
-    Try<string> fetched = fetch(uri.value(), directory, frameworksHome);
+  foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) {
+    Try<string> fetched =
+      fetch(item, cacheDirectory, sandboxDirectory, frameworksHome);
     if (fetched.isError()) {
-      EXIT(1) << "Failed to fetch: " << uri.value();
-    }
-
-    // Chmod the fetched URI if it's executable, else assume it's an archive
-    // that should be extracted.
-    if (uri.executable()) {
-      Try<Nothing> chmod = os::chmod(
-          fetched.get(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
-      if (chmod.isError()) {
-        EXIT(1) << "Failed to chmod " << fetched.get() << ": " << chmod.error();
-      }
-    } else if (uri.extract()) {
-      // TODO(idownes): Consider removing the archive once extracted.
-      // Try to extract the file if it's recognized as an archive.
-      Try<bool> extracted = extract(fetched.get(), directory);
-      if (extracted.isError()) {
-        EXIT(1) << "Failed to extract "
-                << fetched.get() << ":" << extracted.error();
-      }
+      EXIT(1) << "Failed to fetch '" << item.uri().value()
+              << "': " + fetched.error();
     } else {
-      LOG(INFO) << "Skipped extracting path '" << fetched.get() << "'";
+      LOG(INFO) << "Fetched '" << item.uri().value()
+                << "' to '" << fetched.get() << "'";
     }
+  }
 
-    // Recursively chown the directory if a user is provided.
-    if (user.isSome()) {
-      Try<Nothing> chowned = os::chown(user.get(), directory);
-      if (chowned.isError()) {
-        EXIT(1) << "Failed to chown " << directory << ": " << chowned.error();
-      }
+  // Recursively chown the sandbox directory if a user is provided.
+  if (fetcherInfo.get().has_user()) {
+    Try<Nothing> chowned = os::chown(
+        fetcherInfo.get().user(),
+        sandboxDirectory);
+    if (chowned.isError()) {
+      EXIT(1) << "Failed to chown " << sandboxDirectory
+              << ": " << chowned.error();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 206d439..84927e5 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -108,6 +108,9 @@ extern const Duration DOCKER_VERSION_WAIT_TIMEOUT;
 // Name of the default, CRAM-MD5 authenticatee.
 extern const std::string DEFAULT_AUTHENTICATEE;
 
+// Default maximum storage space to be used by the fetcher cache.
+const Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2);
+
 // If no pings received within this timeout, then the slave will
 // trigger a re-detection of the master to cause a re-registration.
 Duration MASTER_PING_TIMEOUT();

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index bd58f94..41e0b98 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -325,7 +325,8 @@ DockerContainerizerProcess::Container::create(
 
 
 Future<Nothing> DockerContainerizerProcess::fetch(
-    const ContainerID& containerId)
+    const ContainerID& containerId,
+    const SlaveID& slaveId)
 {
   CHECK(containers_.contains(containerId));
   Container* container = containers_[containerId];
@@ -335,6 +336,7 @@ Future<Nothing> DockerContainerizerProcess::fetch(
       container->command,
       container->directory,
       None(),
+      slaveId,
       flags);
 }
 
@@ -772,7 +774,7 @@ Future<bool> DockerContainerizerProcess::launch(
 
   if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) {
     // Launching task by forking a subprocess to run docker executor.
-    return container.get()->launch = fetch(containerId)
+    return container.get()->launch = fetch(containerId, slaveId)
       .then(defer(self(), [=]() { return pull(containerId); }))
       .then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))
       .then(defer(self(), [=](pid_t pid) {
@@ -794,7 +796,7 @@ Future<bool> DockerContainerizerProcess::launch(
   // is running in a container (via docker_mesos_image flag)
   // we want the executor to keep running when the slave container
   // dies.
-  return container.get()->launch = fetch(containerId)
+  return container.get()->launch = fetch(containerId, slaveId)
     .then(defer(self(), [=]() { return pull(containerId); }))
     .then(defer(self(), [=]() {
       return launchExecutorContainer(containerId, containerName);
@@ -936,7 +938,6 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess(
 }
 
 
-
 Future<pid_t> DockerContainerizerProcess::checkpointExecutor(
     const ContainerID& containerId,
     const Docker::Container& dockerContainer)

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index d2cca4b..395d535 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -152,7 +152,9 @@ public:
       const ContainerID& containerId,
       bool killed = true); // process is either killed or reaped.
 
-  virtual process::Future<Nothing> fetch(const ContainerID& containerId);
+  virtual process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const SlaveID& slaveId);
 
   virtual process::Future<Nothing> pull(const ContainerID& containerId);