You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:45:59 UTC

[1/4] mesos git commit: Added a cache to the Fetcher.

Repository: mesos
Updated Branches:
  refs/heads/master b16999a4c -> 7aede4ad4


http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
new file mode 100644
index 0000000..99777f8
--- /dev/null
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -0,0 +1,1359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <gmock/gmock.h>
+
+#include <list>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/check.hpp>
+#include <process/clock.hpp>
+#include <process/collect.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/message.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/queue.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+#include "common/lock.hpp"
+
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/gc.hpp"
+#include "slave/flags.hpp"
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::FetcherProcess;
+
+using process::Future;
+using process::HttpEvent;
+using process::Owned;
+using process::PID;
+using process::Promise;
+using process::Process;
+using process::Queue;
+using process::Subprocess;
+
+using std::list;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::DoDefault;
+using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+static const string ASSETS_DIRECTORY_NAME = "mesos-fetcher-test-assets";
+static const string COMMAND_NAME = "mesos-fetcher-test-cmd";
+static const string ARCHIVE_NAME = "mesos-fetcher-test-archive.tgz";
+static const string ARCHIVED_COMMAND_NAME = "mesos-fetcher-test-acmd";
+
+// Every task executes one of these shell scripts, which create a
+// file that includes the current task name in its name. The latter
+// is expected to be passed in as a script argument. The existence
+// of the file with that name is then used as proof that the task
+// ran successfully.
+static const string COMMAND_SCRIPT = "touch " + COMMAND_NAME + "$1";
+static const string ARCHIVED_COMMAND_SCRIPT =
+  "touch " + ARCHIVED_COMMAND_NAME + "$1";
+
+
+class FetcherCacheTest : public MesosTest
+{
+public:
+  struct Task {
+    Path runDirectory;
+    Queue<TaskStatus> statusQueue;
+  };
+
+  void setupCommandFileAsset();
+
+protected:
+  void setupArchiveAsset();
+
+  virtual void SetUp();
+  virtual void TearDown();
+
+  // Sets up the slave and starts it. Calling this late in the test
+  // instead of having it included in SetUp() gives us the opportunity
+  // to manipulate values in 'flags', first.
+  void startSlave();
+
+  // Stops the slave, deleting the containerizer, for subsequent
+  // recovery testing.
+  void stopSlave();
+
+  Task launchTask(const CommandInfo& commandInfo, const size_t taskIndex);
+
+  vector<Task> launchTasks(const vector<CommandInfo>& commandInfos);
+
+  // Waits until FetcherProcess::run() has been called for all tasks.
+  void awaitFetchContention();
+
+  string assetsDirectory;
+  string commandPath;
+  string archivePath;
+
+  slave::Flags flags;
+  MesosContainerizer* containerizer;
+  PID<Slave> slavePid;
+  SlaveID slaveId;
+  string cacheDirectory;
+  MockFetcherProcess* fetcherProcess;
+  MockScheduler scheduler;
+  MesosSchedulerDriver* driver;
+
+private:
+  Fetcher* fetcher;
+
+  FrameworkID frameworkId;
+
+  // Promises whose futures indicate that FetcherProcess::_fetch() has been
+  // called for a task with a given index.
+  vector<Owned<Promise<Nothing>>> fetchContentionWaypoints;
+};
+
+
+void FetcherCacheTest::SetUp()
+{
+  MesosTest::SetUp();
+
+  flags = CreateSlaveFlags();
+  flags.resources =
+    Some(stringify(Resources::parse("cpus:1000;mem:1000").get()));
+
+  assetsDirectory = path::join(flags.work_dir, ASSETS_DIRECTORY_NAME);
+  ASSERT_SOME(os::mkdir(assetsDirectory));
+
+  setupCommandFileAsset();
+  setupArchiveAsset();
+
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  fetcherProcess = new MockFetcherProcess();
+  fetcher = new Fetcher(Owned<FetcherProcess>(fetcherProcess));
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_name("default");
+  frameworkInfo.set_checkpoint(true);
+
+  driver = new MesosSchedulerDriver(
+    &scheduler, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(scheduler, registered(driver, _, _))
+    .Times(1);
+}
+
+
+void FetcherCacheTest::TearDown()
+{
+  driver->stop();
+  driver->join();
+  delete driver;
+
+  delete fetcher;
+
+  MesosTest::TearDown();
+}
+
+
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+void FetcherCacheTest::startSlave()
+{
+  Try<MesosContainerizer*> create = MesosContainerizer::create(
+      flags, true, fetcher);
+  ASSERT_SOME(create);
+  containerizer = create.get();
+
+  Try<PID<Slave>> pid = StartSlave(containerizer, flags);
+  ASSERT_SOME(pid);
+  slavePid = pid.get();
+
+  // Obtain the slave ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  AWAIT_READY(slaveRegisteredMessage);
+  slaveId = slaveRegisteredMessage.get().slave_id();
+
+  cacheDirectory =
+    slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+}
+
+
+void FetcherCacheTest::stopSlave()
+{
+  Stop(slavePid);
+  delete containerizer;
+}
+
+
+void FetcherCacheTest::setupCommandFileAsset()
+{
+  commandPath = path::join(assetsDirectory, COMMAND_NAME);
+  ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+  // Make the command file read-only, so we can discern the URI
+  // executable flag.
+  ASSERT_SOME(os::chmod(commandPath, S_IRUSR | S_IRGRP | S_IROTH));
+}
+
+
+void FetcherCacheTest::setupArchiveAsset()
+{
+  string path = path::join(assetsDirectory, ARCHIVED_COMMAND_NAME);
+  ASSERT_SOME(os::write(path, ARCHIVED_COMMAND_SCRIPT));
+
+  // Make the archived command file executable before archiving it,
+  // since the executable flag for CommandInfo::URI has no effect on
+  // what comes out of an archive.
+  ASSERT_SOME(os::chmod(path, S_IRWXU | S_IRWXG | S_IRWXO));
+
+  const string cwd = os::getcwd();
+  ASSERT_SOME(os::chdir(assetsDirectory));
+  ASSERT_SOME(os::tar(ARCHIVED_COMMAND_NAME, ARCHIVE_NAME));
+  ASSERT_SOME(os::chdir(cwd));
+  archivePath = path::join(assetsDirectory, ARCHIVE_NAME);
+
+  // Make the archive file read-only, so we can tell if it becomes
+  // executable by acccident.
+  ASSERT_SOME(os::chmod(archivePath, S_IRUSR | S_IRGRP | S_IROTH));
+}
+
+
+static string taskName(int taskIndex)
+{
+  return stringify(taskIndex);
+}
+
+
+// TODO(bernd-mesos): Use Path, not string, create Path::executable().
+static bool isExecutable(const string& path)
+{
+  Try<bool> access = os::access(path, X_OK);
+  EXPECT_SOME(access);
+  return access.isSome() && access.get();
+}
+
+
+// Create a future that indicates that the task observed by the given
+// status queue is finished.
+static Future<Nothing> awaitFinished(FetcherCacheTest::Task task)
+{
+  return task.statusQueue.get()
+    .then([=](const TaskStatus& status) -> Future<Nothing> {
+      if (status.state() == TASK_FINISHED) {
+        return Nothing();
+      }
+      return awaitFinished(task);
+  });
+}
+
+
+// Create a future that indicates that all tasks are finished.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+static Future<list<Nothing>> awaitFinished(
+    vector<FetcherCacheTest::Task> tasks)
+{
+  list<Future<Nothing>> futures;
+
+  foreach (FetcherCacheTest::Task task, tasks) {
+    futures.push_back(awaitFinished(task));
+  }
+
+  return collect(futures);
+}
+
+
+// Pushes the TaskStatus value in mock call argument #1 into the
+// given queue, which later on shall be queried by awaitFinished().
+ACTION_P(PushTaskStatus, taskStatusQueue)
+{
+  TaskStatus taskStatus = arg1;
+
+  // Input parameters of ACTION_P are const. We make a mutable copy
+  // so that we can use put().
+  Queue<TaskStatus> queue = taskStatusQueue;
+
+  queue.put(taskStatus);
+}
+
+
+// Launches a task as described by its CommandInfo and returns its sandbox
+// run directory path. Its completion will be indicated by the result of
+// awaitFinished(task), where `task` is the return value of this method..
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+FetcherCacheTest::Task FetcherCacheTest::launchTask(
+    const CommandInfo& commandInfo,
+    const size_t taskIndex)
+{
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(scheduler, resourceOffers(driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());
+
+  offers.await(Seconds(15));
+  CHECK_READY(offers) << "Failed to wait for resource offers";
+
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name(taskName(taskIndex));
+  task.mutable_task_id()->set_value(taskName(taskIndex));
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+
+  // We don't care about resources in these tests. This small amount
+  // will always succeed.
+  task.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:1;mem:1").get());
+
+  task.mutable_command()->CopyFrom(commandInfo);
+
+  // Since we are always using a command executor here, the executor
+  // ID can be determined by copying the task ID.
+  ExecutorID executorId;
+  executorId.set_value(task.task_id().value());
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Queue<TaskStatus> taskStatusQueue;
+
+  EXPECT_CALL(scheduler, statusUpdate(driver, _))
+    .WillRepeatedly(PushTaskStatus(taskStatusQueue));
+
+  driver->launchTasks(offer.id(), tasks);
+
+  const Path path = Path(slave::paths::getExecutorLatestRunPath(
+      flags.work_dir,
+      slaveId,
+      offer.framework_id(),
+      executorId));
+
+  return Task {path, taskStatusQueue};
+}
+
+
+// Pushes the task status value of a task status update callback
+// into the task status queue that corresponds to the task index/ID
+// for which the status update is being reported. 'tasks' must be a
+// 'vector<Task>>', where every slot index corresponds to a task
+// index/ID.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+ACTION_TEMPLATE(PushIndexedTaskStatus,
+                HAS_1_TEMPLATE_PARAMS(int, k),
+                AND_1_VALUE_PARAMS(tasks))
+{
+  TaskStatus taskStatus = ::std::tr1::get<k>(args);
+  Try<int> taskId = numify<int>(taskStatus.task_id().value());
+  ASSERT_SOME(taskId);
+  Queue<TaskStatus> queue = (tasks)[taskId.get()].statusQueue;
+  queue.put(taskStatus);
+}
+
+
+// Satisfies the first promise in the list that is not satisfied yet.
+ACTION_P(SatisfyOne, promises)
+{
+  foreach (const Owned<Promise<Nothing>>& promise, *promises) {
+    if (promise->future().isPending()) {
+      promise->set(Nothing());
+      return;
+    }
+  }
+
+  FAIL() << "Tried to call FetcherProcess::_fetch() "
+         << "for more tasks than launched";
+}
+
+
+// Launches the tasks described by the given CommandInfo and returns a
+// vector holding the run directory paths. All these tasks run
+// concurrently. Their completion will be indicated by the result of
+// awaitFinished(tasks), where `tasks` is the return value of this
+// method.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks(
+    const vector<CommandInfo>& commandInfos)
+{
+  vector<FetcherCacheTest::Task> result;
+
+  // When _fetch() is called, notify us by satisfying a promise that
+  // a task has passed the code stretch in which it competes for cache
+  // entries.
+  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _))
+    .WillRepeatedly(
+        DoAll(SatisfyOne(&fetchContentionWaypoints),
+              Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(scheduler, resourceOffers(driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());
+
+  offers.await(Seconds(15));
+  CHECK_READY(offers) << "Failed to wait for resource offers";
+
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  vector<TaskInfo> tasks;
+  foreach (const CommandInfo& commandInfo, commandInfos) {
+    size_t taskIndex = tasks.size();
+
+    // Grabbing the framework ID from somewhere. It should not matter
+    // if this happens several times, as we expect the framework ID to
+    // remain the same.
+    frameworkId = offer.framework_id();
+
+    TaskInfo task;
+    task.set_name(taskName(taskIndex));
+    task.mutable_task_id()->set_value(taskName(taskIndex));
+    task.mutable_slave_id()->CopyFrom(offer.slave_id());
+
+    // We don't care about resources in these tests. This small amount
+    // will always succeed.
+    task.mutable_resources()->CopyFrom(
+        Resources::parse("cpus:1;mem:1").get());
+
+    task.mutable_command()->CopyFrom(commandInfo);
+
+    tasks.push_back(task);
+
+    // Since we are always using a command executor here, the executor
+    // ID can be determined by copying the task ID.
+    ExecutorID executorId;
+    executorId.set_value(task.task_id().value());
+
+    Path runDirectory = Path(slave::paths::getExecutorLatestRunPath(
+        flags.work_dir,
+        slaveId,
+        frameworkId,
+        executorId));
+
+    // Grabbing task status futures to wait for. We make a queue of futures
+    // for each task. We can then wait until the front element indicates
+    // status TASK_FINISHED. We use a queue, because we never know which
+    // status update will be the one we have been waiting for.
+    Queue<TaskStatus> taskStatusQueue;
+
+    result.push_back(Task {runDirectory, taskStatusQueue});
+
+    EXPECT_CALL(scheduler, statusUpdate(driver, _))
+      .WillRepeatedly(PushIndexedTaskStatus<1>(result));
+
+    auto waypoint = Owned<Promise<Nothing>>(new Promise<Nothing>());
+    fetchContentionWaypoints.push_back(waypoint);
+  }
+
+  driver->launchTasks(offer.id(), tasks);
+
+  return result;
+}
+
+
+// Ensure that FetcherProcess::_fetch() has been called for each task,
+// which means that all tasks are competing for downloading the same URIs.
+void FetcherCacheTest::awaitFetchContention()
+{
+  foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
+    AWAIT(waypoint->future());
+  }
+}
+
+
+// Tests fetching from the local asset directory without cache. This
+// gives us a baseline for the following tests and lets us debug our
+// test infrastructure without extra complications.
+TEST_F(FetcherCacheTest, LocalUncached)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_EQ(0u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+  }
+}
+
+
+// Tests fetching from the local asset directory with simple caching.
+// Only one download must occur. Fetching is serialized, to cover
+// code areas without overlapping/concurrent fetch attempts.
+TEST_F(FetcherCacheTest, LocalCached)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+// Tests falling back on bypassing the cache when fetching the download
+// size of a URI that is supposed to be cached fails.
+TEST_F(FetcherCacheTest, CachedFallback)
+{
+  startSlave();
+  driver->start();
+
+  // Make sure the content-length request fails.
+  ASSERT_SOME(os::rm(commandPath));
+
+  CommandInfo::URI uri;
+  uri.set_value(commandPath);
+  uri.set_executable(true);
+  uri.set_cache(true);
+
+  CommandInfo commandInfo;
+  commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(0));
+  commandInfo.add_uris()->CopyFrom(uri);
+
+  // Bring back the asset just before running mesos-fetcher to fetch it.
+  Future<FetcherInfo> fetcherInfo;
+  EXPECT_CALL(*fetcherProcess, run(_, _, _))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo),
+                    InvokeWithoutArgs(this,
+                                      &FetcherCacheTest::setupCommandFileAsset),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)));
+
+  const Task task = launchTask(commandInfo, 0);
+
+  AWAIT_READY(awaitFinished(task));
+
+  const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+  EXPECT_TRUE(isExecutable(path));
+  EXPECT_TRUE(os::exists(path + taskName(0)));
+
+  AWAIT_READY(fetcherInfo);
+
+  EXPECT_EQ(1, fetcherInfo.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
+            fetcherInfo.get().items(0).action());
+
+  EXPECT_EQ(0u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+}
+
+
+// Tests archive extraction without caching as a baseline for the
+// subsequent test below.
+TEST_F(FetcherCacheTest, LocalUncachedExtract)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(archivePath);
+    uri.set_extract(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_TRUE(os::exists(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+    EXPECT_FALSE(isExecutable(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+
+    const string path =
+      path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(0u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+// Tests archive extraction in combination with caching.
+TEST_F(FetcherCacheTest, LocalCachedExtract)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(archivePath);
+    uri.set_extract(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_FALSE(os::exists(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+
+    const string path =
+      path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+class FetcherCacheHttpTest : public FetcherCacheTest
+{
+public:
+  // A minimal HTTP server (not intended as an actor) just reusing what
+  // is already implemented somewhere to serve some HTTP requests for
+  // file downloads. Plus counting how many requests are made. Plus the
+  // ability to pause answering requests, stalling them.
+  class HttpServer : public Process<HttpServer>
+  {
+  public:
+    HttpServer(FetcherCacheHttpTest* test)
+      : countRequests(0),
+        countCommandRequests(0),
+        countArchiveRequests(0)
+    {
+      provide(COMMAND_NAME, test->commandPath);
+      provide(ARCHIVE_NAME, test->archivePath);
+
+      spawn(this);
+    }
+
+    string url()
+    {
+      return "http://127.0.0.1:" +
+             stringify(self().address.port) +
+             "/" + self().id + "/";
+    }
+
+    // Stalls the execution of HTTP requests inside visit().
+    void pause()
+    {
+      mutex.lock();
+    }
+
+    void resume()
+    {
+      mutex.unlock();
+    }
+
+    virtual void visit(const HttpEvent& event)
+    {
+      std::lock_guard<std::mutex> lock(mutex);
+
+      countRequests++;
+
+      if (strings::contains(event.request->path, COMMAND_NAME)) {
+        countCommandRequests++;
+      }
+
+      if (strings::contains(event.request->path, ARCHIVE_NAME)) {
+        countArchiveRequests++;
+      }
+
+      ProcessBase::visit(event);
+    }
+
+    void resetCounts()
+    {
+      countRequests = 0;
+      countCommandRequests = 0;
+      countArchiveRequests = 0;
+    }
+
+    size_t countRequests;
+    size_t countCommandRequests;
+    size_t countArchiveRequests;
+
+  private:
+    std::mutex mutex;
+  };
+
+
+  virtual void SetUp()
+  {
+    FetcherCacheTest::SetUp();
+
+    httpServer = new HttpServer(this);
+  }
+
+  virtual void TearDown()
+  {
+    terminate(httpServer);
+    wait(httpServer);
+    delete httpServer;
+
+    FetcherCacheTest::TearDown();
+  }
+
+  HttpServer* httpServer;
+};
+
+
+// Tests fetching via HTTP with caching. Only one download must
+// occur. Fetching is serialized, to cover code areas without
+// overlapping/concurrent fetch attempts.
+TEST_F(FetcherCacheHttpTest, HttpCachedSerialized)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path =
+      path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // 2 requests: 1 for content-length, 1 for download.
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+}
+
+
+// Tests multiple concurrent fetching efforts that require some
+// concurrency control. One task must "win" and perform the size
+// and download request for the URI alone. The others must reuse
+// the result.
+TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent)
+{
+  startSlave();
+  driver->start();
+
+  // Causes fetch contention. No task can run yet until resume().
+  httpServer->pause();
+
+  vector<CommandInfo> commandInfos;
+  const size_t countTasks = 5;
+
+  for (size_t i = 0; i < countTasks; i++) {
+    CommandInfo::URI uri0;
+    uri0.set_value(httpServer->url() + COMMAND_NAME);
+    uri0.set_executable(true);
+    uri0.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri0);
+
+    // Not always caching this URI causes that it will be downloaded
+    // some of the time. Thus we exercise code paths that eagerly fetch
+    // new assets while waiting for pending downloads of cached assets
+    // as well as code paths where no downloading occurs at all.
+    if (i % 2 == 1) {
+      CommandInfo::URI uri1;
+      uri1.set_value(httpServer->url() + ARCHIVE_NAME);
+      commandInfo.add_uris()->CopyFrom(uri1);
+    }
+
+    commandInfos.push_back(commandInfo);
+  }
+
+  vector<Task> tasks = launchTasks(commandInfos);
+
+  CHECK_EQ(countTasks, tasks.size());
+
+  // Given pausing the HTTP server, this proves that fetch contention
+  // has happened. All tasks have passed the point where it occurs,
+  // but they are not running yet.
+  awaitFetchContention();
+
+  // Now let the tasks run.
+  httpServer->resume();
+
+  AWAIT_READY(awaitFinished(tasks));
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+  // command content-length requests: 1
+  // command downloads: 1
+  // archive downloads: 2
+  EXPECT_EQ(2u, httpServer->countCommandRequests);
+  EXPECT_EQ(2u, httpServer->countArchiveRequests);
+
+  for (size_t i = 0; i < countTasks; i++) {
+    EXPECT_EQ(i % 2 == 1, os::exists(
+        path::join(tasks[i].runDirectory.value, ARCHIVE_NAME)));
+    EXPECT_TRUE(isExecutable(
+        path::join(tasks[i].runDirectory.value, COMMAND_NAME)));
+    EXPECT_TRUE(os::exists(
+        path::join(tasks[i].runDirectory.value, COMMAND_NAME + taskName(i))));
+  }
+}
+
+
+// Tests using multiple URIs per command, variations of caching,
+// setting the executable flag, and archive extraction.
+TEST_F(FetcherCacheHttpTest, HttpMixed)
+{
+  startSlave();
+  driver->start();
+
+  // Causes fetch contention. No task can run yet until resume().
+  httpServer->pause();
+
+  vector<CommandInfo> commandInfos;
+
+  // Task 0.
+
+  CommandInfo::URI uri00;
+  uri00.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri00.set_cache(true);
+  uri00.set_extract(false);
+  uri00.set_executable(false);
+
+  CommandInfo::URI uri01;
+  uri01.set_value(httpServer->url() + COMMAND_NAME);
+  uri01.set_extract(false);
+  uri01.set_executable(true);
+
+  CommandInfo commandInfo0;
+  commandInfo0.set_value("./" + COMMAND_NAME + " " + taskName(0));
+  commandInfo0.add_uris()->CopyFrom(uri00);
+  commandInfo0.add_uris()->CopyFrom(uri01);
+  commandInfos.push_back(commandInfo0);
+
+  // Task 1.
+
+  CommandInfo::URI uri10;
+  uri10.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri10.set_extract(true);
+  uri10.set_executable(false);
+
+  CommandInfo::URI uri11;
+  uri11.set_value(httpServer->url() + COMMAND_NAME);
+  uri11.set_extract(true);
+  uri11.set_executable(false);
+
+  CommandInfo commandInfo1;
+  commandInfo1.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(1));
+  commandInfo1.add_uris()->CopyFrom(uri10);
+  commandInfo1.add_uris()->CopyFrom(uri11);
+  commandInfos.push_back(commandInfo1);
+
+  // Task 2.
+
+  CommandInfo::URI uri20;
+  uri20.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri20.set_cache(true);
+  uri20.set_extract(true);
+  uri20.set_executable(false);
+
+  CommandInfo::URI uri21;
+  uri21.set_value(httpServer->url() + COMMAND_NAME);
+  uri21.set_extract(false);
+  uri21.set_executable(false);
+
+  CommandInfo commandInfo2;
+  commandInfo2.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(2));
+  commandInfo2.add_uris()->CopyFrom(uri20);
+  commandInfo2.add_uris()->CopyFrom(uri21);
+  commandInfos.push_back(commandInfo2);
+
+  vector<Task> tasks = launchTasks(commandInfos);
+
+  CHECK_EQ(3u, tasks.size());
+
+  // Given pausing the HTTP server, this proves that fetch contention
+  // has happened. All tasks have passed the point where it occurs,
+  // but they are not running yet.
+  awaitFetchContention();
+
+  // Now let the tasks run.
+  httpServer->resume();
+
+  AWAIT_READY(awaitFinished(tasks));
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+  // command content-length requests: 0
+  // command downloads: 3
+  // archive content-length requests: 1
+  // archive downloads: 2
+  EXPECT_EQ(3u, httpServer->countCommandRequests);
+  EXPECT_EQ(3u, httpServer->countArchiveRequests);
+
+  // Task 0.
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[0].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_FALSE(os::exists(
+      path::join(tasks[0].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[0].runDirectory.value, COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(
+      path::join(tasks[0].runDirectory.value, COMMAND_NAME + taskName(0))));
+
+  // Task 1.
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[1].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(path::join(
+      tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(1))));
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[1].runDirectory.value, COMMAND_NAME)));
+
+  // Task 2.
+
+  EXPECT_FALSE(os::exists(
+      path::join(tasks[2].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(path::join(
+      tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(2))));
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[2].runDirectory.value, COMMAND_NAME)));
+}
+
+
+// Tests slave recovery of the fetcher cache. The cache must be
+// wiped clean on recovery, causing renewed downloads.
+TEST_F(FetcherCacheHttpTest, HttpCachedRecovery)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // content-length requests: 1
+    // downloads: 1
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+
+  stopSlave();
+
+  // Start over.
+  httpServer->resetCounts();
+
+  // Don't reuse the old fetcher, which has stale state after
+  // stopping the slave.
+  Fetcher fetcher2;
+
+  Try<MesosContainerizer*> c =
+    MesosContainerizer::create(flags, true, &fetcher2);
+  CHECK_SOME(c);
+  containerizer = c.get();
+
+  // Set up so we can wait until the new slave updates the container's
+  // resources (this occurs after the executor has re-registered).
+  Future<Nothing> update =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::update);
+
+  Try<PID<Slave>> pid = StartSlave(containerizer, flags);
+  CHECK_SOME(pid);
+  slavePid = pid.get();
+
+  // Wait until the containerizer is updated.
+  AWAIT_READY(update);
+
+  // Recovery must have cleaned the cache by now.
+  EXPECT_FALSE(os::exists(cacheDirectory));
+
+  // Repeat of the above to see if it works the same.
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path =
+      path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // content-length requests: 1
+    // downloads: 1
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+}
+
+
+// Tests cache eviction. Limits the available cache space then fetches
+// more task scripts than fit into the cache and runs them all. We
+// observe how the number of cache files rises and then stays constant.
+TEST_F(FetcherCacheTest, SimpleEviction)
+{
+  const size_t countCacheEntries = 3;
+
+  // Let only the first 'countCacheEntries' downloads fit in the cache.
+  flags.fetcher_cache_size = COMMAND_SCRIPT.size() * countCacheEntries;
+
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < countCacheEntries + 2; i++) {
+    string commandFilename = "cmd" + stringify(i);
+    string command = commandFilename + " " + taskName(i);
+
+    commandPath = path::join(assetsDirectory, commandFilename);
+    ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + command);
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    // Check that the task succeeded.
+    EXPECT_TRUE(isExecutable(
+        path::join(task.runDirectory.value, commandFilename)));
+    EXPECT_TRUE(os::exists(
+        path::join(task.runDirectory.value, COMMAND_NAME + taskName(i))));
+
+    if (i < countCacheEntries) {
+      EXPECT_EQ(i + 1, fetcherProcess->cacheSize());
+      EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+      EXPECT_EQ(i+1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+    } else {
+      EXPECT_EQ(countCacheEntries, fetcherProcess->cacheSize());
+      EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+      EXPECT_EQ(countCacheEntries,
+                fetcherProcess->cacheFiles(slaveId, flags).get().size());
+    }
+  }
+}
+
+
+// Tests cache eviction fallback to bypassing the cache. A first task
+// runs normally. Then a second succeeds using eviction. Then a third
+// task fails to evict, but still gets executed bypassing the cache.
+TEST_F(FetcherCacheTest, FallbackFromEviction)
+{
+  // The size by which every task's URI download is going to be larger
+  // than the previous one.
+  const size_t growth = 10;
+
+  // Let only the first two downloads fit into the cache, one at a time,
+  // the second evicting the first. The third file won't fit any more,
+  // being larger than the entire cache.
+  flags.fetcher_cache_size = COMMAND_SCRIPT.size() + growth;
+
+  startSlave();
+  driver->start();
+
+  // We'll run 3 tasks and these are the task completion futures to wait
+  // for each time.
+  Future<FetcherInfo> fetcherInfo0;
+  Future<FetcherInfo> fetcherInfo1;
+  Future<FetcherInfo> fetcherInfo2;
+  EXPECT_CALL(*fetcherProcess, run(_, _, _))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo0),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo1),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo2),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)));
+
+
+  // Task 0:
+
+  const string commandFilename0 = "cmd0";
+  const string command0 = commandFilename0 + " " + taskName(0);
+
+  commandPath = path::join(assetsDirectory, commandFilename0);
+
+  // Write the command into the script that gets fetched.
+  ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+  CommandInfo::URI uri0;
+  uri0.set_value(commandPath);
+  uri0.set_executable(true);
+  uri0.set_cache(true);
+
+  CommandInfo commandInfo0;
+  commandInfo0.set_value("./" + command0);
+  commandInfo0.add_uris()->CopyFrom(uri0);
+
+  const Task task0 = launchTask(commandInfo0, 0);
+
+  AWAIT_READY(awaitFinished(task0));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task0.runDirectory.value, commandFilename0)));
+  EXPECT_TRUE(os::exists(
+      path::join(task0.runDirectory.value, COMMAND_NAME + taskName(0))));
+
+  AWAIT_READY(fetcherInfo0);
+
+  EXPECT_EQ(1, fetcherInfo0.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
+            fetcherInfo0.get().items(0).action());
+
+  // We have put a file of size 'COMMAND_SCRIPT.size()' in the cache
+  // with space 'COMMAND_SCRIPT.size() + growth'. So we must have 'growth'
+  // space left.
+  CHECK_EQ(Bytes(growth), fetcherProcess->availableCacheSpace());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+
+  // Task 1:
+
+  const string commandFilename1 = "cmd1";
+  const string command1 = commandFilename1 + " " + taskName(1);
+
+  commandPath = path::join(assetsDirectory, commandFilename1);
+
+  // Write the command into the script that gets fetched. Add 'growth'
+  // extra characters so the cache will fill up to the last byte.
+  ASSERT_SOME(os::write(
+      commandPath,
+      COMMAND_SCRIPT + std::string(growth, '\n')));
+
+  CommandInfo::URI uri1;
+  uri1.set_value(commandPath);
+  uri1.set_executable(true);
+  uri1.set_cache(true);
+
+  CommandInfo commandInfo1;
+  commandInfo1.set_value("./" + command1);
+  commandInfo1.add_uris()->CopyFrom(uri1);
+
+  const Task task1 = launchTask(commandInfo1, 1);
+
+  AWAIT_READY(awaitFinished(task1));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task1.runDirectory.value, commandFilename1)));
+  EXPECT_TRUE(os::exists(
+      path::join(task1.runDirectory.value, COMMAND_NAME + taskName(1))));
+
+  AWAIT_READY(fetcherInfo1);
+
+  EXPECT_EQ(1, fetcherInfo1.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
+            fetcherInfo1.get().items(0).action());
+
+  // The cache must now be full.
+  CHECK_EQ(Bytes(0u), fetcherProcess->availableCacheSpace());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+
+  // Task 2:
+
+  const string commandFilename2 = "cmd2";
+  const string command2 = commandFilename2 + " " + taskName(2);
+
+  commandPath = path::join(assetsDirectory, commandFilename2);
+
+  // Write the command into the script that gets fetched. Add
+  // '2 * growth' now. Thus the file will be so big that it will not
+  // fit into the cache any more.
+  ASSERT_SOME(os::write(
+      commandPath,
+      COMMAND_SCRIPT + std::string(2 * growth, '\n')));
+
+  CommandInfo::URI uri2;
+  uri2.set_value(commandPath);
+  uri2.set_executable(true);
+  uri2.set_cache(true);
+
+  CommandInfo commandInfo2;
+  commandInfo2.set_value("./" + command2);
+  commandInfo2.add_uris()->CopyFrom(uri2);
+
+  const Task task2 = launchTask(commandInfo2, 2);
+
+  AWAIT_READY(awaitFinished(task2));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task2.runDirectory.value, commandFilename2)));
+  EXPECT_TRUE(os::exists(
+      path::join(task2.runDirectory.value, COMMAND_NAME + taskName(2))));
+
+  AWAIT_READY(fetcherInfo2);
+
+  EXPECT_EQ(1, fetcherInfo2.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
+            fetcherInfo2.get().items(0).action());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 4549e6a..361d918 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -48,314 +48,154 @@
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
+using namespace mesos::slave;
+
 using namespace process;
 
-using process::Subprocess;
-using process::Future;
+using mesos::fetcher::FetcherInfo;
 
 using mesos::internal::slave::Fetcher;
 
-using std::string;
+using process::Subprocess;
+using process::Future;
+
 using std::map;
+using std::string;
 
-using mesos::fetcher::FetcherInfo;
 
 namespace mesos {
 namespace internal {
 namespace tests {
 
-class FetcherEnvironmentTest : public ::testing::Test {};
+class FetcherTest : public TemporaryDirectoryTest {};
 
 
-TEST_F(FetcherEnvironmentTest, Simple)
+TEST_F(FetcherTest, FileURI)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "test");
+  EXPECT_SOME(os::write(testFile, "data"));
 
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
+  string localFile = path::join(os::getcwd(), "test");
+  EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, MultipleURIs)
-{
   CommandInfo commandInfo;
-  CommandInfo::URI uri;
-  uri.set_value("hdfs:///uri1");
-  uri.set_executable(false);
-  commandInfo.add_uris()->MergeFrom(uri);
-  uri.set_value("hdfs:///uri2");
-  uri.set_executable(true);
-  commandInfo.add_uris()->MergeFrom(uri);
-
-  string directory = "/tmp/directory";
-  Option<string> user("user");
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+  CommandInfo::URI* uri = commandInfo.add_uris();
+  uri->set_value("file://" + testFile);
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_TRUE(os::exists(localFile));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoUser)
+// Negative test: invalid user name. Copied from FileTest, so this
+// normally would succeed, but here a bogus user name is specified.
+// So we check for fetch failure.
+TEST_F(FetcherTest, InvalidUser)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "test");
+  EXPECT_SOME(os::write(testFile, "data"));
 
-  string directory = "/tmp/directory";
+  string localFile = path::join(os::getcwd(), "test");
+  EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, None(), flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_FALSE(fetcherInfo.get().has_user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, EmptyHadoop)
-{
   CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  commandInfo.set_user(UUID::random().toString());
 
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
+  CommandInfo::URI* uri = commandInfo.add_uris();
+  uri->set_value("file://" + testFile);
 
-  EXPECT_EQ(0u, environment.count("HADOOP_HOME"));
-  EXPECT_EQ(1u, environment.size());
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  // See FetcherProcess::fetch(), the message must mention "chown" in
+  // this case.
+  EXPECT_TRUE(strings::contains(fetch.failure(), "chown"));
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_FALSE(os::exists(localFile));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoHadoop)
+// Negative test: URI leading to non-existing file. Copied from FileTest,
+// but here the resource is missing. So we check for fetch failure.
+TEST_F(FetcherTest, NonExistingFile)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "nonExistingFile");
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(0u, environment.count("HADOOP_HOME"));
-  EXPECT_EQ(1u, environment.size());
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
-
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
-{
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
-  uri->set_extract(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+  uri->set_value("file://" + testFile);
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  // See FetcherProcess::run().
+  EXPECT_TRUE(strings::contains(fetch.failure(), "Failed to fetch"));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
+// Negative test: malformed URI, missing path.
+TEST_F(FetcherTest, MalformedURI)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(true);
-  uri->set_extract(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
-
-
-class FetcherTest : public TemporaryDirectoryTest {};
-
 
-TEST_F(FetcherTest, FileURI)
-{
-  string fromDir = path::join(os::getcwd(), "from");
-  ASSERT_SOME(os::mkdir(fromDir));
-  string testFile = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testFile, "data").isError());
-
-  string localFile = path::join(os::getcwd(), "test");
-  EXPECT_FALSE(os::exists(localFile));
-
-  slave::Flags flags;
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("file://" + testFile);
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
+  uri->set_value("lala://nopath");
 
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  EXPECT_TRUE(os::exists(localFile));
+  // See Fetcher::basename().
+  EXPECT_TRUE(strings::contains(fetch.failure(), "Malformed"));
 }
 
 
@@ -364,31 +204,27 @@ TEST_F(FetcherTest, AbsoluteFilePath)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testPath = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testPath, "data").isError());
+  EXPECT_SOME(os::write(testPath, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(testPath);
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -399,55 +235,38 @@ TEST_F(FetcherTest, RelativeFilePath)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testPath = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testPath, "data").isError());
+  EXPECT_SOME(os::write(testPath, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value("test");
 
-  // The first run must fail, because we have not set frameworks_home yet.
-
-  map<string, string> environment1 =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess1 =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment1);
-
-  ASSERT_SOME(fetcherSubprocess1);
-  Future<Option<int>> status1 = fetcherSubprocess1.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status1);
-  ASSERT_SOME(status1.get());
+  // The first run must fail, because we have not set frameworks_home yet.
 
-  // mesos-fetcher always exits with EXIT(1) on failure.
-  EXPECT_EQ(1, WIFEXITED(status1.get().get()));
+  Future<Nothing> fetch1 = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch1);
 
   EXPECT_FALSE(os::exists(localFile));
 
   // The next run must succeed due to this flag.
   flags.frameworks_home = fromDir;
 
-  map<string, string> environment2 =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess2 =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment2);
-
-  ASSERT_SOME(fetcherSubprocess2);
-  Future<Option<int>> status2 = fetcherSubprocess2.get().status();
-
-  AWAIT_READY(status2);
-  ASSERT_SOME(status2.get());
-  EXPECT_EQ(0, status2.get().get());
+  Future<Nothing> fetch2 = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch2);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -481,26 +300,22 @@ TEST_F(FetcherTest, OSNetUriTest)
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(url);
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -511,32 +326,27 @@ TEST_F(FetcherTest, FileLocalhostURI)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testFile = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testFile, "data").isError());
+  EXPECT_SOME(os::write(testFile, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path::join("file://localhost", testFile));
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -561,20 +371,11 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
-
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
   AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
@@ -611,19 +412,11 @@ TEST_F(FetcherTest, NoExtractExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 
@@ -669,19 +462,11 @@ TEST_F(FetcherTest, ExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 
@@ -768,19 +553,11 @@ TEST_F(FetcherTest, HdfsURI)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path::join("hdfs://localhost", testFile));
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 1d5639c..d7a3c06 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -48,6 +48,7 @@
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
+using std::list;
 using std::shared_ptr;
 using std::string;
 using testing::_;
@@ -140,6 +141,7 @@ slave::Flags MesosTest::CreateSlaveFlags()
   CHECK_SOME(directory) << "Failed to create temporary directory";
 
   flags.work_dir = directory.get();
+  flags.fetcher_cache_dir = path::join(directory.get(), "fetch");
 
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
@@ -445,6 +447,47 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 }
 
 
+MockFetcherProcess::MockFetcherProcess()
+{
+  // Set up default behaviors, calling the original methods.
+  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)).
+    WillRepeatedly(
+        Invoke(this, &MockFetcherProcess::unmocked__fetch));
+  EXPECT_CALL(*this, run(_, _, _)).
+    WillRepeatedly(Invoke(this, &MockFetcherProcess::unmocked_run));
+}
+
+
+process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
+  const list<Future<shared_ptr<Cache::Entry>>> futures,
+  const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
+    entries,
+  const ContainerID& containerId,
+  const string& sandboxDirectory,
+  const string& cacheDirectory,
+  const Option<string>& user,
+  const slave::Flags& flags)
+{
+  return slave::FetcherProcess::_fetch(
+      futures,
+      entries,
+      containerId,
+      sandboxDirectory,
+      cacheDirectory,
+      user,
+      flags);
+}
+
+
+process::Future<Nothing> MockFetcherProcess::unmocked_run(
+    const ContainerID& containerId,
+    const FetcherInfo& info,
+    const slave::Flags& flags)
+{
+  return slave::FetcherProcess::run(containerId, info, flags);
+}
+
+
 slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
 {
   slave::Flags flags = MesosTest::CreateSlaveFlags();

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ac986a0..a1c6ae4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -30,6 +30,8 @@
 
 #include <mesos/master/allocator.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
 #include <mesos/slave/resource_estimator.hpp>
 
 #include <process/future.hpp>
@@ -44,6 +46,7 @@
 #include <stout/foreach.hpp>
 #include <stout/gtest.hpp>
 #include <stout/lambda.hpp>
+#include <stout/memory.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/stringify.hpp>
@@ -799,6 +802,52 @@ private:
 };
 
 
+// Definition of a mock FetcherProcess to be used in tests with gmock.
+class MockFetcherProcess : public slave::FetcherProcess
+{
+public:
+  MockFetcherProcess();
+
+  virtual ~MockFetcherProcess() {}
+
+  MOCK_METHOD7(_fetch, process::Future<Nothing>(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user,
+      const slave::Flags& flags));
+
+  process::Future<Nothing> unmocked__fetch(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user,
+      const slave::Flags& flags);
+
+  MOCK_METHOD3(run, process::Future<Nothing>(
+      const ContainerID& containerId,
+      const FetcherInfo& info,
+      const slave::Flags& flags));
+
+  process::Future<Nothing> unmocked_run(
+      const ContainerID& containerId,
+      const FetcherInfo& info,
+      const slave::Flags& flags);
+};
+
+
 // Definition of a MockAuthozier that can be used in tests with gmock.
 class MockAuthorizer : public Authorizer
 {


[3/4] mesos git commit: Added a cache to the Fetcher.

Posted by be...@apache.org.
Added a cache to the Fetcher.

Almost all of the functionality in epic MESOS-336. Downloaded files
from CommandInfo::URIs can now be cached in a cache directory
designated by a slave flag. This only happens when asked for by an
extra flag in the URI and is thus backwards-compatible. The cache has
a size limit also given by a new slave flag. Cache-resident files are
evicted as necessary to make space for newly fetched ones. Concurrent
attempts to cache the same URI leads to only one download. The fetcher
program remains external for safety reasons, but is now augmented with
more elaborate parameters packed into a JSON object to implement
specific fetch actions for all of the above. Additional testing
includes fetching from (mock) HDFS and coverage of the new features.

Review: https://reviews.apache.org/r/30774


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/edd35b05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/edd35b05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/edd35b05

Branch: refs/heads/master
Commit: edd35b050a736aa6f7e3e6939a6ee074df66954d
Parents: b16999a
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Thu May 21 19:34:46 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Jun 1 02:27:43 2015 -0700

----------------------------------------------------------------------
 docs/configuration.md                           |   18 +
 docs/fetcher-cache-internals.md                 |  115 ++
 docs/fetcher.md                                 |  255 ++++
 include/mesos/fetcher/fetcher.proto             |   43 +-
 include/mesos/mesos.proto                       |   17 +
 include/mesos/type_utils.hpp                    |   17 +
 src/Makefile.am                                 |    1 +
 src/hdfs/hdfs.hpp                               |   35 +
 src/launcher/fetcher.cpp                        |  500 ++++---
 src/slave/constants.hpp                         |    3 +
 src/slave/containerizer/docker.cpp              |    9 +-
 src/slave/containerizer/docker.hpp              |    4 +-
 src/slave/containerizer/fetcher.cpp             | 1091 ++++++++++++--
 src/slave/containerizer/fetcher.hpp             |  335 ++++-
 src/slave/containerizer/mesos/containerizer.cpp |    7 +-
 src/slave/containerizer/mesos/containerizer.hpp |    3 +-
 src/slave/flags.cpp                             |   17 +
 src/slave/flags.hpp                             |    2 +
 src/slave/slave.cpp                             |   16 +-
 src/tests/docker_containerizer_tests.cpp        |   20 +-
 src/tests/fetcher_cache_tests.cpp               | 1359 ++++++++++++++++++
 src/tests/fetcher_tests.cpp                     |  487 ++-----
 src/tests/mesos.cpp                             |   43 +
 src/tests/mesos.hpp                             |   49 +
 24 files changed, 3665 insertions(+), 781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4aeb4ad..4e20913 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1231,6 +1231,24 @@ file:///path/to/file (where file contains one of the above)</code></pre>
   </tr>
   <tr>
     <td>
+      --fetcher_cache_size=VALUE
+    </td>
+    <td>
+      Size of the fetcher cache in Bytes.
+      (default: 2 GB)
+    </td>
+  </tr>
+  <tr>
+    <td>
+      --fetcher_cache_dir=VALUE
+    </td>
+    <td>
+      Parent directory for fetcher cache directories (one subdirectory per slave). By default this directory is held inside the work directory, so everything can be deleted or archived in one swoop, in particular during testing. However, a typical production scenario is to use a separate cache volume. First, it is not meant to be backed up. Second, you want to avoid that sandbox directories and the cache directory can interfere with each other in unpredictable ways by occupying shared space. So it is recommended to set the cache directory explicitly.
+      (default: /tmp/mesos/fetch)
+    </td>
+  </tr>
+  <tr>
+    <td>
       --work_dir=VALUE
     </td>
     <td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/fetcher-cache-internals.md
----------------------------------------------------------------------
diff --git a/docs/fetcher-cache-internals.md b/docs/fetcher-cache-internals.md
new file mode 100644
index 0000000..c696f15
--- /dev/null
+++ b/docs/fetcher-cache-internals.md
@@ -0,0 +1,115 @@
+---
+layout: documentation
+---
+
+# Mesos Fetcher Cache Internals
+
+It assumed that readers of this document are well familiar with the contents of the overview and user guide of the Mesos fetcher in "fetcher.md". The present document makes direct references to notions defined in the former.
+
+## Design goals for the initial fetcher cache prototype:
+
+0. Direct fetching: Provide the pre-existing fetcher functionality (as in Mesos 0.22 and before) when caching is not explicitly requested.
+1. Program isolation: Preserve the approach to employ an external "mesos-fetcher" program to handle all (potentially very lengthy or blocking) content download operations.
+2. Cache effect: Significant lessen overall time spent on fetching in case of repetition of requests for the same URI. This holds for both sequential and concurrent repetition. The latter is the case when concurrently launched tasks on the same slave require overlapping URI sets.
+3. Cache space limit: Use a user-specified directory for cache storage and maintain a user-specified physical storage space limit for it. Evict older cache files as needed to fetch new cache content.
+4. Fallback strategy: Whenever downloading to or from the cache fails for any reason, fetching into the sandbox should still succeed somehow if at all possible.
+5. Slave recovery: Support slave recovery.
+
+For future releases, we foresee additional features:
+1. Automatic refreshing of cache content when a URI's content has changed.
+2. Prefetching URIs for subsequent tasks. Prefetching can run in parallel with task execution.
+
+## How the fetcher cache works
+
+In this section we look deeper into the implementation of design goals #1, #2, #3. The others are sufficiently covered in the user guide.
+
+### Fetcher process and mesos-fetcher
+
+The fetcher mechanism consists of two separate entities:
+
+1. The fetcher process included in the slave program. There is exactly one instance of this per slave.
+2. The separate mesos-fetcher program. There is one invocation of this per fetch request from the slave to the fetcher process.
+
+The fetcher process performs internal bookkeeping of what is in the cache and what is not. As needed, it invokes the mesos-fetcher program to download resources from URIs to the cache or directly to sandbox directories, and to copy resources from the cache to a sandbox directory.
+
+All decision making "intelligence" is situated in the fetcher process and the mesos-fetcher program is a rather simple helper program. Except for cache files, there is no persistent state at all in the entire fetcher system. This greatly simplifies dealing with all the inherent intricacies and races involved in concurrent fetching with caching.
+
+The mesos-fetcher program takes straight forward per-URI commands and executes these. It has three possible modes of operation for any given URI:
+
+1. Bypass the cache and fetch directly into the specified sandbox directory.
+2. Fetch into the cache and then copy the resulting cache file into the sandbox directory.
+3. Do not download anything. Copy (or extract) a resource from the cache into the sandbox directory.
+
+Besides minor complications such as archive extraction and execution rights settings, this already sums up all it does.
+
+Based on this setup, the main program flow in the fetcher process is concerned with assembling a list of parameters to the mesos-fetcher program that describe items to be fetched. This figure illustrates the high-level collaboration of the fetcher process with mesos-fetcher program runs. It also depicts the next level of detail of the fetcher process, which will be described in the following section.
+
+![Fetcher Separation of Labor](images/fetch_components.jpg?raw=true)
+
+
+### Cache state representation and manipulation
+
+The fetcher process uses a private instance of class Cache to represent what URIs are cached, where the respective cache files are, what stage of processing they are in, and so on.
+
+The main data structure to hold all this information is a hashmap from URI/user combinations to Cache::Entry objects, which each contain information about an individual cache file on disk. These objects are referenced by shared_ptr, because they can be addressed by multiple callbacks on behalf of concurrent fetch attempts while also being held in the hashmap.
+
+A cache entry corresponds directly to a cache file on disk throughout the entire life time of the latter, including before and after its existence. It holds all pertinent state to inform about the phase and results of fetching the corresponding URI.
+
+This figure illustrates the different states which a cache entry can be in.
+
+![Fetcher Cache State](images/fetch_state.jpg?raw=true)
+
+While a cache entry is referenced it cannot be evicted by a the current or any other concurrent fetch attempt in order to make space for a download of a new cache file.
+
+The two blue states are essentially the same: no cache file exists. The two green disk states on the right are also the same.
+
+The figure only depicts what happens from the point of view of one isolated fetch run. Any given cache entry can be referenced simultaniously by another concurrent fetch run. It must not be evicted as long as it is referenced by any fetching activity. We implement this by reference counting. Every cache entry has a reference count field that gets incremented at the beginning of its use by a fetch run and decremented at its end. The latter must happen no matter whether the run has been successful or whether there has been an error. Increments happen when:
+- A new cache entry is created. It is immediately referenced.
+- An existing cache entry's file download is going to be waited for.
+- An existing cache entry has a resident cache file that is going to be retrieved.
+
+Every increment is recorded in a list. At the very end of the fetch procedure, no matter what its outcome is, each entry in the list gets its reference count decremented.
+
+(Currently, we are even leaving reference counts for cache entries for which we fall back to bypassing the cache untouched until the end of the fetch procedure. This may be unnecessary, but it is safe. It is also supposedly rare, because fallbacks only occur to mitigate unexpected error situations. A future version may optimize this behavior.)
+
+### The per-URI control flow
+
+As menitoned above, the fetcher process' main control flow concerns sorting out what to do with each URI presented to it in a fetch request. An overview of the ensuing control flow for a given URI is depicted in this figure.
+
+![Determining Fetcher Actions](images/fetch_flow.jpg?raw=true)
+
+After going through this procedure for each URI, the fetcher process assembles the gathered list of per-URI actions into a JSON object (FetcherInfo), which is passed to the mesos-fetcher program in an environment variable. The possible fetch actions for a URI are shown at the bottom of the flow chart. After they are determined, the fetcher process invokes mesos-fetcher.
+
+The implementation is oriented at this control flow but its code structure cannot match it directly, because some of these branches must span multiple libprocess continuations. There are two layers of futures, one for each of these phases.
+
+1.  Before making fetcher cache items.
+- a) Wait for concurrent downloads for pre-existing cache entries
+- b) Wait for size fetching combined and then space reservation for new cache entries.
+
+2. After making fetcher cache items and running mesos-fetcher.
+- Complete new cache items with success/failure, which as an important side-effect informs concurrent fetch runs’ futures in phase 1/a.
+
+The futures for phase 1 are not shared outside one fetch run. They exclusively guard asynchronous operations for the same fetch run. Their type parameter does not really matter. But each needs to correspond to one URI and eventual fetch item somehow. Multiple variants have been proposed for this. The complexity remains about the same.
+
+The futures for phase 2 need to be part of the cache entries, because they are shared between concurrent fetch runs.
+
+Some time between phase 1 and 2, the fallback strategy needs to be applied where indicated: when a future from phase 1 has failed for any reason, we fall back on bypassing the cache.
+
+Besides, everything touched in 1/a and 1/b needs to be prevented from being cache-evicted until the end. One can in principle release cache entries right after they fail, but this requires more complexity and is harder to prove correct.
+
+
+### Cache eviction
+
+![Before eviction](images/fetch_evict1.jpg?raw=true)
+
+The resources named "A" and "B" have been fetched with caching into sandbox 1 and 2 below. In the course of this, two cache entries have been created and two files have been downloaded into the cache and named "1" and "2". (Cache file names have unique names that comprise serial numbers.)
+
+The next figure illustrates the state after fetching a different cached URI into sandbox 3, which in this case requires evicting a cache-resident file and its entry. Steps:
+1. Remove the cache entry for "A" from the fetcher process' cache entry table. Its faded depiction is supposed to indicate this. This immediately makes it appear as if the URI has never been cached, even though the cache file is still around.
+2. Proceed with fetching "C". This creates a new cache file, which has a different unique name. (The fetcher process remembers in its cache entry which file name belongs to which URI.)
+
+![After eviction](images/fetch_evict2.jpg?raw=true)
+
+The next figure then shows what happens if the first URI is fetched once again. Here we also assume the cache being so filled up that eviction is necessary and this time the entry and file for "B" are the victims.
+
+![After another eviction](images/fetch_evict3.jpg?raw=true)

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/docs/fetcher.md
----------------------------------------------------------------------
diff --git a/docs/fetcher.md b/docs/fetcher.md
new file mode 100644
index 0000000..638f530
--- /dev/null
+++ b/docs/fetcher.md
@@ -0,0 +1,255 @@
+---
+layout: documentation
+---
+
+# Mesos Fetcher
+
+Experimental support for the Mesos fetcher _cache_ is introduced in
+Mesos 0.23.0.
+
+In this context we loosely regard the term "downloading" as to include copying
+from local file systems.
+
+## What is the Mesos fetcher?
+
+The Mesos fetcher is a mechanism to download resources into the sandbox
+directory of a task in preparation of running the task. As part of a TaskInfo
+message, the framework ordering the task's execution provides a list of
+CommandInfo::URI protobuf values, which becomes the input to the Mesos fetcher.
+
+By default, each requested URI is downloaded directly into the sandbox directory
+and repeated requests for the same URI leads to downloading another copy of the
+same resource. Alternatively, the fetcher can be instructed to cache URI
+downloads in a dedicated directory for reuse by subsequent downloads.
+
+The Mesos fetcher mechanism comprises of these two parts:
+
+1. The slave-internal Fetcher Process (in terms of libprocess) that controls and
+coordinates all fetch actions. Every slave instance has exactly one internal
+fetcher instance that is used by every kind of containerizer (except the
+external containerizer variant, which is responsible for its own approach to
+fetching).
+
+2. The external program "mesos-fetcher" that is invoked by the former. It
+performs all network and disk operations except file deletions and file size
+queries for cache-internal bookkeeping. It is run as an external OS process in
+order to shield the slave process from I/O-related hazards. It takes
+instructions in form of an environment variable containing a JSON object with
+detailed fetch action descriptions.
+
+## The fetch procedure
+
+Frameworks launch tasks by calling the scheduler driver method launchTasks(),
+passing CommandInfo protobuf structures as arguments. This type of structure
+specifies (among other things) a command and a list of URIs that need to be
+"fetched" into the sandbox directory on the the slave node as a precondition for
+task execution. Hence, when the slave receives a request go launch a task, it
+calls upon its fetcher, first, to provision the specified resources into the
+sandbox directory. If fetching fails, the task is not started and the reported
+task status is TASK_FAILED.
+
+All URIs requested for a given task are fetched sequentially in a single
+invocation of mesos-fetcher. Here, avoiding download concurrency reduces the
+risk of bandwidth issues somewhat. However, multiple fetch operations can be
+active concurrently due to multiple task launch requests.
+
+### The URI protobuf structure
+
+Before mesos-fetcher is started, the specific fetch actions to be performed for
+each URI are determined based on the following protobuf structure. (See
+"include/mesos/mesos.proto" for more details.)
+
+    message CommandInfo {
+      message URI {
+        required string value = 1;
+        optional bool executable = 2;
+        optional bool extract = 3 [default = true];
+        optional bool cache = 4;
+      }
+      ...
+      optional string user = 5;
+    }
+
+The field "value" contains the URI.
+
+If the "executable" field is "true", the "extract" field is ignored and
+has no effect.
+
+If the "cache" field is true, the fetcher cache is to be used for the URI.
+
+### Specifying a user name
+
+The framework may pass along a user name that becomes a fetch parameter. This
+causes its executors and tasks to run under a specific user. However, if the
+"user" field in the CommandInfo structure is specified, it takes precedence for
+the affected task.
+
+If a user name is specified either way, the fetcher first validates that it is
+in fact a valid user name on the slave. If it is not, fetching fails right here.
+Otherwise, the sandbox directory is assigned to the specified user as owner
+(using chown) at the end of the fetch procedure, before task execution begins.
+
+The user name in play has an important effect on caching.  Caching is managed on
+a per-user base, i.e. the combination of user name and "uri" uniquely
+identifies a cacheable fetch result. If no user name has been specified, this
+counts for the cache as a separate user, too. Thus cache files for each valid
+user are segregated from all others, including those without a specified user.
+
+This means that the exact same URI will be downloaded and cached multiple times
+if different users are indicated.
+
+### Executable fetch results
+
+By default, fetched files are not executable.
+
+If the field "executable" is set to "true", the fetch result will be changed to
+be executable (by "chmod") for every user. This happens at the end of the fetch
+procedure, in the sandbox directory only. It does not affect any cache file.
+
+### Archive extraction
+
+If the "extract" field is "true", which is the default, then files with
+extensions that hint at packed or compressed archives (".zip", ".tar", et.al.)
+are unpacked in the sandbox directory.
+
+In case the cache is bypassed, both the archive and the unpacked results will be
+found together in the sandbox. In case a cache file is unpacked, only the
+extraction result will be found in the sandbox.
+
+### Bypassing the cache
+
+By default, the URI field "cache" is not present. If this is the case or its
+value is "false" the fetcher downloads directly into the sandbox directory.
+
+The same also happens dynamically as a fallback strategy if anything goes wrong
+when preparing a fetch operation that involves the cache. In this case, a
+warning message is logged. Possible fallback conditions are:
+
+- The server offering the URI does not respond or reports an error.
+- The URI's download size could not be determined.
+- There is not enough space in the cache, even after attempting to evict files.
+
+### Fetching through the cache
+
+If the URI's "cache" field has the value "true", then the fetcher cache is in
+effect. If a URI is encountered for the first time (for the same user), it is
+first downloaded into the cache, then copied to the sandbox directory from
+there. If the same URI is encountered again, and a corresponding cache file is
+resident in the cache or still en route into the cache, then downloading is
+omitted and the fetcher proceeds directly to copying from the cache. Competing
+requests for the same URI simply wait upon completion of the first request that
+occurs. Thus every URI is downloaded at most once (per user) as long as it is
+cached.
+
+Every cache file stays resident for an unspecified amount of time and can be
+removed at the fetcher's discretion at any moment, except while it is in direct
+use:
+
+- It is still being downloaded by this fetch procedure.
+- It is still being downloaded by a concurrent fetch procedure for a different
+  task.
+- It is being copied or extracted from the cache.
+
+Once a cache file has been removed, the related URI will thereafter be treated
+as described above for the first encounter.
+
+Unfortunately, there is no mechanism to refresh a cache entry in the current
+experimental version of the fetcher cache. A future feature may force updates
+based on checksum queries to the URI.
+
+Recommended practice for now:
+
+The framework should start using a fresh unique URI whenever the resource's
+content has changed.
+
+### Determining resource sizes
+
+Before downloading a resource to the cache, the fetcher first determines the
+size of the expected resource. It uses these methods depending on the nature of
+the URI.
+
+- Local file sizes are probed with systems calls (that follow symbolic links).
+- HTTP/HTTPS URIs are queried for the "content-length" field in the header. This
+  is performed by CURL. The reported asset size must be greater than zero or
+  the URI is deemed invalid.
+- FTP/FTPS is not supported at the time of writing.
+- Everything else is queried by the local HDFS client.
+
+If any of this reports an error, the fetcher then falls back on bypassing the
+cache as described above.
+
+WARNING: Only URIs for which download sizes can be queried up front and for
+which accurate sizes are reported reliably are eligible for any fetcher cache
+involvement. If actual cache file sizes exceed the physical capacity of the
+cache directory in any way, all further slave behavior is completely
+unspecified. Do not use any cache feature with any URI for which you have any
+doubts!
+
+To mitigate this problem, cache files that have been found to be larger than
+expected are deleted immediately after downloading and and delivering the
+requested content to the sandbox. Thus exceeding total capacity at least
+does not accumulate over subsequent fetcher runs.
+
+If you know for sure that size aberrations are within certain limits you can
+specify a cache directory size that is sufficiently smaller than your actual
+physical volume and fetching should work.
+
+In case of cache files that are smaller then expected, the cache will
+dynamically adjust its own bookkeeping according to actual sizes.
+
+### Cache eviction
+
+After determining the prospective size of a cache file and before downloading
+it, the cache attempts to ensure that at least as much space as is needed for
+this file is available and can be written into. If this is immediately the case,
+the requested amount of space is simply marked as reserved. Otherwise, missing
+space is freed up by "cache eviction". This means that the cache removes files
+at its own discretion until the given space target is met or exceeded.
+
+The eviction process fails if too many files are in use and therefore not
+evictable or if the cache is simply too small. Either way, the fetcher then
+falls back on bypassing the cache for the given URI as described above.
+
+If multiple evictions happen concurrently, each of them is pursuing its own
+separate space goals. However, leftover freed up space from one effort is
+automatically awarded to others.
+
+## Slave flags
+
+It is highly recommended to set these flags explicitly to values other than
+their defaults or to not use the fetcher cache in production.
+
+- "fetcher_cache_size", default value: enough for testing.
+- "fetcher_cache_dir", default value: somewhere inside the directory specified
+  by the "work_dir" flag, which is OK for testing.
+
+Recommended practice:
+
+- Use a separate volume as fetcher cache. Do not specify a directory as fetcher
+  cache directory that competes with any other contributor for the underlying
+  volume's space.
+- Set the cache directory size flag of the slave to less than your actual cache
+  volume's physical size. Use a safety margin, especially if you do not know
+  for sure if all frameworks are going to be compliant.
+
+Ultimate remedy:
+
+You can disable the fetcher cache entirely on each slave by setting its
+"fetcher_cache_size" flag to zero bytes.
+
+## Future Features
+The following features would be relatively easy to implement additionally.
+
+- Perform cache updates based on resource check sums. For example, query the md5
+  field in HTTP headers to determine when a resource at a URL has changed.
+- Respect HTTP cache-control directives.
+- Enable caching for ftp/ftps.
+- Use symbolic links or bind mounts to project cached resources into the
+  sandbox, read-only.
+- Have a choice whether to copy the extracted archive into the sandbox.
+- Have a choice whether to delete the archive after extraction bypassing the
+  cache.
+- Make the segregation of cache files by user optional.
+- Extract content while downloading when bypassing the cache.
+- Prefetch resources for subsequent tasks. This can happen concurrently with
+  running the present task, right after fetching its own resources.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/fetcher/fetcher.proto
----------------------------------------------------------------------
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
index 311af9a..1b2f493 100644
--- a/include/mesos/fetcher/fetcher.proto
+++ b/include/mesos/fetcher/fetcher.proto
@@ -23,14 +23,45 @@ package mesos.fetcher;
 option java_package = "org.apache.mesos.fetcher";
 option java_outer_classname = "Protos";
 
-
 /**
  * Encodes the fetcher environment variable sent to the external fetcher
- * program.
+ * program. See also "docs/fetcher.md" and
+ * "docs/fetcher-cache-internals.md". Note that part of these
+ * definitions are quoted verbatim in "docs/fetcher.md" and must be
+ * updated there whenever they change here.
  */
 message FetcherInfo {
-  required CommandInfo command_info = 1;
-  required string work_directory = 2;
-  optional string user = 3;
-  optional string frameworks_home = 4;
+  message Item
+  {
+    // What action the fetcher program is supposed to perform for a
+    // given URI.
+    enum Action
+    {
+      // Bypass the cache, download directly into the sandbox directory.
+      BYPASS_CACHE = 0;
+
+      // Download a resource at the given URI to the fetcher's file cache.
+      // Then retrieve the resource from the cache into the sandbox
+      // directory.
+      DOWNLOAD_AND_CACHE = 1;
+
+      // Copy or extract the resource from the cache, without downloading.
+      RETRIEVE_FROM_CACHE = 2;
+    }
+
+    required CommandInfo.URI uri = 1;
+    required Action action = 2;
+    optional string cache_filename = 3;
+  }
+
+  // Must be present when fetching into the sandbox in any way.
+  required string sandbox_directory = 1;
+
+  // Optional so that fetch requests that only use BYPASS_CACHE do not
+  // need to specify an unused cache directory.
+  optional string cache_directory = 2;
+
+  repeated Item items = 3;
+  optional string user = 4;
+  optional string frameworks_home = 5;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index a668889..5cf81e2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -232,7 +232,24 @@ message CommandInfo {
   message URI {
     required string value = 1;
     optional bool executable = 2;
+
+    // In case the fetched file is recognized as an archive, extract
+    // its contents into the sandbox. Note that a cached archive is
+    // not copied from the cache to the sandbox in case extraction
+    // originates from an archive in the cache.
     optional bool extract = 3 [default = true];
+
+    // If this field is "true", the fetcher cache will be used. If not,
+    // fetching bypasses the cache and downloads directly into the
+    // sandbox directory, no matter whether a suitable cache file is
+    // available or not. The former directs the fetcher to download to
+    // the file cache, then copy from there to the sandbox. Subsequent
+    // fetch attempts with the same URI will omit downloading and copy
+    // from the cache as long as the file is resident there. Cache files
+    // may get evicted at any time, which then leads to renewed
+    // downloading. See also "docs/fetcher.md" and
+    // "docs/fetcher-cache-internals.md".
+    optional bool cache = 4;
   }
 
   // Describes a container.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 837be6f..52380c2 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -188,6 +188,23 @@ inline bool operator < (const TaskID& left, const TaskID& right)
 }
 
 
+inline std::size_t hash_value(const CommandInfo::URI& uri)
+{
+  size_t seed = 0;
+
+  if (uri.extract()) {
+    seed += 11;
+  }
+
+  if (uri.executable()) {
+    seed += 2003;
+  }
+
+  boost::hash_combine(seed, uri.value());
+  return seed;
+}
+
+
 inline std::size_t hash_value(const ContainerID& containerId)
 {
   size_t seed = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 9d1f0d5..a5a7306 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1418,6 +1418,7 @@ mesos_tests_SOURCES =				\
   tests/external_containerizer_test.cpp		\
   tests/health_check_tests.cpp                  \
   tests/fault_tolerance_tests.cpp		\
+  tests/fetcher_cache_tests.cpp            \
   tests/fetcher_tests.cpp                       \
   tests/files_tests.cpp				\
   tests/flags.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 968545d..0b0312c 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -2,6 +2,7 @@
 #define __HDFS_HPP__
 
 #include <sstream>
+#include <vector>
 
 #include <stout/check.hpp>
 #include <stout/error.hpp>
@@ -78,6 +79,40 @@ struct HDFS
     return status.get() == 0;
   }
 
+  Try<Bytes> du(std::string path)
+  {
+    // Make sure 'path' starts with a '/'.
+    path = path::join("", path);
+
+    Try<std::string> command = strings::format(
+        "%s fs -du -h '%s'", hadoop, path);
+
+    CHECK_SOME(command);
+
+    std::ostringstream output;
+
+    Try<int> status = os::shell(&output, command.get() + " 2>&1");
+
+    if (status.isError()) {
+      return Error("HDFS du failed: " + status.error());
+    }
+
+    const std::vector<std::string>& s = strings::split(output.str(), " ");
+    if (s.size() != 2) {
+      return Error("HDFS du returned an unexpected number of results: '" +
+                   output.str() + "'");
+    }
+
+    Result<size_t> size = numify<size_t>(s[0]);
+    if (size.isError()) {
+      return Error("HDFS du returned unexpected format: " + size.error());
+    } else if (size.isNone()) {
+      return Error("HDFS du returned unexpected format");
+    }
+
+    return Bytes(size.get());
+  }
+
   Try<Nothing> rm(std::string path)
   {
     // Make sure 'to' starts with a '/'.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index 796526f..c32106f 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -34,226 +34,344 @@
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
 using namespace mesos;
 using namespace mesos::internal;
 
 using mesos::fetcher::FetcherInfo;
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::string;
+using mesos::internal::slave::Fetcher;
 
+using std::string;
 
-const char FILE_URI_PREFIX[] = "file://";
-const char FILE_URI_LOCALHOST[] = "file://localhost";
 
-// Try to extract filename into directory. If filename is recognized as an
-// archive it will be extracted and true returned; if not recognized then false
-// will be returned. An Error is returned if the extraction command fails.
-Try<bool> extract(const string& filename, const string& directory)
+// Try to extract sourcePath into directory. If sourcePath is
+// recognized as an archive it will be extracted and true returned;
+// if not recognized then false will be returned. An Error is
+// returned if the extraction command fails.
+static Try<bool> extract(
+    const string& sourcePath,
+    const string& destinationDirectory)
 {
   string command;
   // Extract any .tgz, tar.gz, tar.bz2 or zip files.
-  if (strings::endsWith(filename, ".tgz") ||
-      strings::endsWith(filename, ".tar.gz") ||
-      strings::endsWith(filename, ".tbz2") ||
-      strings::endsWith(filename, ".tar.bz2") ||
-      strings::endsWith(filename, ".txz") ||
-      strings::endsWith(filename, ".tar.xz")) {
-    command = "tar -C '" + directory + "' -xf";
-  } else if (strings::endsWith(filename, ".zip")) {
-    command = "unzip -d '" + directory + "'";
+  if (strings::endsWith(sourcePath, ".tgz") ||
+      strings::endsWith(sourcePath, ".tar.gz") ||
+      strings::endsWith(sourcePath, ".tbz2") ||
+      strings::endsWith(sourcePath, ".tar.bz2") ||
+      strings::endsWith(sourcePath, ".txz") ||
+      strings::endsWith(sourcePath, ".tar.xz")) {
+    command = "tar -C '" + destinationDirectory + "' -xf";
+  } else if (strings::endsWith(sourcePath, ".zip")) {
+    command = "unzip -d '" + destinationDirectory + "'";
   } else {
     return false;
   }
 
-  command += " '" + filename + "'";
+  command += " '" + sourcePath + "'";
+
+  LOG(INFO) << "Extracting with command: " << command;
+
   int status = os::system(command);
   if (status != 0) {
     return Error("Failed to extract: command " + command +
                  " exited with status: " + stringify(status));
   }
 
-  LOG(INFO) << "Extracted resource '" << filename
-            << "' into '" << directory << "'";
+  LOG(INFO) << "Extracted '" << sourcePath << "' into '"
+            << destinationDirectory << "'";
 
   return true;
 }
 
 
 // Attempt to get the uri using the hadoop client.
-Try<string> fetchWithHadoopClient(
-    const string& uri,
-    const string& directory)
+static Try<string> downloadWithHadoopClient(
+    const string& sourceUri,
+    const string& destinationPath)
 {
   HDFS hdfs;
   Try<bool> available = hdfs.available();
 
   if (available.isError() || !available.get()) {
-    LOG(INFO) << "Hadoop Client not available, "
-              << "skipping fetch with Hadoop Client";
-    return Error("Hadoop Client unavailable");
+    return Error("Skipping fetch with Hadoop Client as"
+                 " Hadoop Client not available: " + available.error());
   }
 
-  LOG(INFO) << "Fetching URI '" << uri << "' using Hadoop Client";
+  LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
+            << "' to '" << destinationPath << "'";
 
-  Try<string> base = os::basename(uri);
-  if (base.isError()) {
-    LOG(ERROR) << "Invalid basename for URI: " << base.error();
-    return Error("Invalid basename for URI");
+  Try<Nothing> result = hdfs.copyToLocal(sourceUri, destinationPath);
+  if (result.isError()) {
+    return Error("HDFS copyToLocal failed: " + result.error());
   }
 
-  string path = path::join(directory, base.get());
+  return destinationPath;
+}
 
-  LOG(INFO) << "Downloading resource from '" << uri  << "' to '" << path << "'";
 
-  Try<Nothing> result = hdfs.copyToLocal(uri, path);
-  if (result.isError()) {
-    LOG(ERROR) << "HDFS copyToLocal failed: " << result.error();
-    return Error(result.error());
+static Try<string> downloadWithNet(
+    const string& sourceUri,
+    const string& destinationPath)
+{
+  LOG(INFO) <<  "Downloading resource from '" << sourceUri
+            << "' to '" << destinationPath << "'";
+
+  Try<int> code = net::download(sourceUri, destinationPath);
+  if (code.isError()) {
+    return Error("Error downloading resource: " + code.error());
+  } else if (code.get() != 200) {
+    return Error("Error downloading resource, received HTTP/FTP return code " +
+                 stringify(code.get()));
   }
 
-  return path;
+  return destinationPath;
 }
 
 
-Try<string> fetchWithNet(
-    const string& uri,
-    const string& directory)
+static Try<string> copyFile(
+    const string& sourcePath,
+    const string& destinationPath)
 {
-  LOG(INFO) << "Fetching URI '" << uri << "' with os::net";
+  const string command = "cp '" + sourcePath + "' '" + destinationPath + "'";
 
-  string path = uri.substr(uri.find("://") + 3);
-  if (path.find("/") == string::npos ||
-      path.size() <= path.find("/") + 1) {
-    LOG(ERROR) << "Malformed URL (missing path)";
-    return Error("Malformed URI");
+  LOG(INFO) << "Copying resource with command:" << command;
+
+  int status = os::system(command);
+  if (status != 0) {
+    return Error("Failed to copy with command '" + command +
+                 "', exit status: " + stringify(status));
   }
 
-  path = path::join(directory, path.substr(path.find_last_of("/") + 1));
-  LOG(INFO) << "Downloading '" << uri << "' to '" << path << "'";
-  Try<int> code = net::download(uri, path);
-  if (code.isError()) {
-    LOG(ERROR) << "Error downloading resource: " << code.error().c_str();
-    return Error("Fetch of URI failed (" + code.error() + ")");
-  } else if (code.get() != 200) {
-    LOG(ERROR) << "Error downloading resource, received HTTP/FTP return code "
-    << code.get();
-    return Error("HTTP/FTP error (" + stringify(code.get()) + ")");
+  return destinationPath;
+}
+
+
+static Try<string> download(
+    const string& sourceUri,
+    const string& destinationPath,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching URI '" << sourceUri << "'";
+  Try<Nothing> validation = Fetcher::validateUri(sourceUri);
+  if (validation.isError()) {
+    return Error(validation.error());
+  }
+
+  // 1. Try to fetch using a local copy.
+  // We regard as local: "file://" or the absense of any URI scheme.
+  Result<string> sourcePath =
+    Fetcher::uriToLocalPath(sourceUri, frameworksHome);
+
+  if (sourcePath.isError()) {
+    return Error(sourcePath.error());
+  } else if (sourcePath.isSome()) {
+    return copyFile(sourcePath.get(), destinationPath);
   }
 
-  return path;
+  // 2. Try to fetch URI using os::net / libcurl implementation.
+  // We consider http, https, ftp, ftps compatible with libcurl.
+  if (Fetcher::isNetUri(sourceUri)) {
+     return downloadWithNet(sourceUri, destinationPath);
+  }
+
+  // 3. Try to fetch the URI using hadoop client.
+  // We use the hadoop client to fetch any URIs that are not
+  // handled by other fetchers(local / os::net). These URIs may be
+  // `hdfs://` URIs or any other URI that has been configured (and
+  // hence handled) in the hadoop client. This allows mesos to
+  // externalize the handling of previously unknown resource
+  // endpoints without the need to support them natively.
+  // Note: Hadoop Client is not a hard dependency for running mesos.
+  // This allows users to get mesos up and running without a
+  // hadoop_home or the hadoop client setup but in case we reach
+  // this part and don't have one configured, the fetch would fail
+  // and log an appropriate error.
+  return downloadWithHadoopClient(sourceUri, destinationPath);
 }
 
 
-Try<string> fetchWithLocalCopy(
-    const string& uri,
-    const string& directory,
-    const Option<std::string>& frameworksHome)
+// TODO(bernd-mesos): Refactor this into stout so that we can more easily
+// chmod an exectuable. For example, we could define some static flags
+// so that someone can do: os::chmod(path, EXECUTABLE_CHMOD_FLAGS).
+static Try<string> chmodExecutable(const string& filePath)
 {
-    string local = uri;
-    bool fileUri = false;
-    if (strings::startsWith(local, string(FILE_URI_LOCALHOST))) {
-        local = local.substr(sizeof(FILE_URI_LOCALHOST) - 1);
-        fileUri = true;
-    } else if (strings::startsWith(local, string(FILE_URI_PREFIX))) {
-        local = local.substr(sizeof(FILE_URI_PREFIX) - 1);
-        fileUri = true;
-    }
+  Try<Nothing> chmod = os::chmod(
+      filePath, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
+  if (chmod.isError()) {
+    return Error("Failed to chmod executable '" +
+                 filePath + "': " + chmod.error());
+  }
 
-    if (fileUri && !strings::startsWith(local, "/")) {
-        return Error("File URI only supports absolute paths");
-    }
+  return filePath;
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchBypassingCache(
+    const CommandInfo::URI& uri,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching directly into the sandbox directory";
+
+  Try<string> basename = Fetcher::basename(uri.value());
+  if (basename.isError()) {
+    return Error("Failed to determine the basename of the URI '" +
+                 uri.value() + "' with error: " + basename.error());
+  }
+
+  string path = path::join(sandboxDirectory, basename.get());
+
+  Try<string> downloaded = download(uri.value(), path, frameworksHome);
+  if (downloaded.isError()) {
+    return Error(downloaded.error());
+  }
 
-    if (local.find_first_of("/") != 0) {
-        // We got a non-Hadoop and non-absolute path.
-        if (frameworksHome.isSome()) {
-            local = path::join(frameworksHome.get(), local);
-            LOG(INFO) << "Prepended the slave's frameworks_home flag value "
-                      << " to relative path, making it: '" << local << "'";
-        } else {
-            LOG(ERROR) << "A relative path was passed for the resource but the "
-                       << "slave's frameworks_home flag is not set; "
-                       << "please either specify this slave configuration "
-                       << "option or avoid using a relative path";
-            return Error("Could not resolve relative URI");
-        }
+  if (uri.executable()) {
+    return chmodExecutable(downloaded.get());
+  } else if (uri.extract()) {
+    Try<bool> extracted = extract(path, sandboxDirectory);
+    if (extracted.isError()) {
+      return Error(extracted.error());
+    } else {
+      LOG(WARNING) << "Copying instead of extracting resource from URI with "
+                   << "'extract' flag, because it does not seem to be an "
+                   << "archive: " << uri.value();
     }
+  }
+
+  return downloaded;
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchFromCache(
+    const FetcherInfo::Item& item,
+    const string& cacheDirectory,
+    const string& sandboxDirectory)
+{
+  LOG(INFO) << "Fetching from cache";
+
+  Try<string> basename = Fetcher::basename(item.uri().value());
+  if (basename.isError()) {
+    return Error(basename.error());
+  }
+
+  string destinationPath = path::join(sandboxDirectory, basename.get());
 
-    Try<string> base = os::basename(local);
-    if (base.isError()) {
-        LOG(ERROR) << base.error();
-        return Error("Fetch of URI failed");
+  string sourcePath = path::join(cacheDirectory, item.cache_filename());
+
+  if (item.uri().executable()) {
+    Try<string> copied = copyFile(sourcePath, destinationPath);
+    if (copied.isError()) {
+      return Error(copied.error());
     }
 
-    // Copy the resource to the directory.
-    string path = path::join(directory, base.get());
-    std::ostringstream command;
-    command << "cp '" << local << "' '" << path << "'";
-    LOG(INFO) << "Copying resource from '" << local
-              << "' to '" << directory << "'";
-
-    int status = os::system(command.str());
-    if (status != 0) {
-        LOG(ERROR) << "Failed to copy '" << local
-                   << "' : Exit status " << status;
-        return Error("Local copy failed");
+    return chmodExecutable(copied.get());
+  } else if (item.uri().extract()) {
+    Try<bool> extracted = extract(sourcePath, sandboxDirectory);
+    if (extracted.isError()) {
+      return Error(extracted.error());
+    } else if (extracted.get()) {
+      return sandboxDirectory;
+    } else {
+      LOG(WARNING) << "Copying instead of extracting resource from URI with "
+                   << "'extract' flag, because it does not seem to be an "
+                   << "archive: " << item.uri().value();
     }
+  }
 
-    return path;
+  return copyFile(sourcePath, destinationPath);
 }
 
 
-// Fetch URI into directory.
-Try<string> fetch(
-    const string& uri,
-    const string& directory,
-    const Option<std::string>& frameworksHome)
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetchThroughCache(
+    const FetcherInfo::Item& item,
+    const Option<string>& cacheDirectory,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
 {
-    LOG(INFO) << "Fetching URI '" << uri << "'";
-    // Some checks to make sure using the URI value in shell commands
-    // is safe. TODO(benh): These should be pushed into the scheduler
-    // driver and reported to the user.
-    if (uri.find_first_of('\\') != string::npos ||
-        uri.find_first_of('\'') != string::npos ||
-        uri.find_first_of('\0') != string::npos) {
-        LOG(ERROR) << "URI contains illegal characters, refusing to fetch";
-        return Error("Illegal characters in URI");
-    }
+  if (cacheDirectory.isNone() || cacheDirectory.get().empty()) {
+    return Error("Cache directory not specified");
+  }
 
-    // 1. Try to fetch using a local copy.
-    // We consider file:// or no scheme uri are considered local uri.
-    if (strings::startsWith(uri, "file://") ||
-        uri.find("://") == string::npos) {
-      return fetchWithLocalCopy(uri, directory, frameworksHome);
+  if (!item.has_cache_filename() || item.cache_filename().empty()) {
+    // This should never happen if this program is used by the Mesos
+    // slave and could then be a CHECK. But other uses are possible.
+    return Error("No cache file name for: " + item.uri().value());
+  }
+
+  if (item.action() != FetcherInfo::Item::RETRIEVE_FROM_CACHE) {
+    CHECK_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE, item.action())
+      << "Unexpected fetcher action selector";
+
+    LOG(INFO) << "Downloading into cache";
+
+    Try<Nothing> mkdir = os::mkdir(cacheDirectory.get());
+    if (mkdir.isError()) {
+      return Error("Failed to create fetcher cache directory '" +
+                   cacheDirectory.get() + "': " + mkdir.error());
     }
 
-    // 2. Try to fetch URI using os::net / libcurl implementation.
-    // We consider http, https, ftp, ftps compatible with libcurl
-    if (strings::startsWith(uri, "http://") ||
-               strings::startsWith(uri, "https://") ||
-               strings::startsWith(uri, "ftp://") ||
-               strings::startsWith(uri, "ftps://")) {
-      return fetchWithNet(uri, directory);
+    Try<string> downloaded = download(
+        item.uri().value(),
+        path::join(cacheDirectory.get(), item.cache_filename()),
+        frameworksHome);
+
+    if (downloaded.isError()) {
+      return Error(downloaded.error());
     }
+  }
+
+  return fetchFromCache(item, cacheDirectory.get(), sandboxDirectory);
+}
+
+
+// Returns the resulting file or in case of extraction the destination
+// directory (for logging).
+static Try<string> fetch(
+    const FetcherInfo::Item& item,
+    const Option<string>& cacheDirectory,
+    const string& sandboxDirectory,
+    const Option<string>& frameworksHome)
+{
+  LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
+
+  if (item.action() == FetcherInfo::Item::BYPASS_CACHE) {
+    return fetchBypassingCache(
+        item.uri(),
+        sandboxDirectory,
+        frameworksHome);
+  }
 
-    // 3. Try to fetch the URI using hadoop client.
-    // We use the hadoop client to fetch any URIs that are not
-    // handled by other fetchers(local / os::net). These URIs may be
-    // `hdfs://` URIs or any other URI that has been configured (and
-    // hence handled) in the hadoop client. This allows mesos to
-    // externalize the handling of previously unknown resource
-    // endpoints without the need to support them natively.
-    // Note: Hadoop Client is not a hard dependency for running mesos.
-    // This allows users to get mesos up and running without a
-    // hadoop_home or the hadoop client setup but in case we reach
-    // this part and don't have one configured, the fetch would fail
-    // and log an appropriate error.
-    return fetchWithHadoopClient(uri, directory);
+  return fetchThroughCache(
+      item,
+      cacheDirectory,
+      sandboxDirectory,
+      frameworksHome);
 }
 
 
+// This "fetcher program" is invoked by the slave's fetcher actor
+// (Fetcher, FetcherProcess) to "fetch" URIs into the sandbox directory
+// of a given task. Its parameters are provided in the form of the env
+// var MESOS_FETCHER_INFO which contains a FetcherInfo (see
+// fetcher.proto) object formatted in JSON. These are set by the actor
+// to indicate what set of URIs to process and how to proceed with
+// each one. A URI can be downloaded directly to the task's sandbox
+// directory or it can be copied to a cache first or it can be reused
+// from the cache, avoiding downloading. All cache management and
+// bookkeeping is centralized in the slave's fetcher actor, which can
+// have multiple instances of this fetcher program running at any
+// given time. Exit code: 0 if entirely successful, otherwise 1.
 int main(int argc, char* argv[])
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -262,79 +380,59 @@ int main(int argc, char* argv[])
 
   Try<Nothing> load = flags.load("MESOS_", argc, argv);
 
-  if (load.isError()) {
-    cerr << load.error() << endl;
-    exit(1);
-  }
+  CHECK_SOME(load) << "Could not load flags: " << load.error();
 
   logging::initialize(argv[0], flags, true); // Catch signals.
 
   CHECK(os::hasenv("MESOS_FETCHER_INFO"))
     << "Missing MESOS_FETCHER_INFO environment variable";
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(os::getenv("MESOS_FETCHER_INFO"));
+  string jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
+  LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo;
 
-  if (parse.isError()) {
-    EXIT(1) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
-  }
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(jsonFetcherInfo);
+  CHECK_SOME(parse) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
 
-  Try<FetcherInfo> fetcherInfo = protobuf::parse<FetcherInfo>(parse.get());
-  if (fetcherInfo.isError()) {
-    EXIT(1) << "Failed to parse FetcherInfo: " << fetcherInfo.error();
-  }
+  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
+  CHECK_SOME(fetcherInfo)
+    << "Failed to parse FetcherInfo: " << fetcherInfo.error();
 
-  const CommandInfo& commandInfo = fetcherInfo.get().command_info();
+  CHECK(!fetcherInfo.get().sandbox_directory().empty())
+    << "Missing sandbox directory";
 
-  const string& directory = fetcherInfo.get().work_directory();
-  if (directory.empty()) {
-    EXIT(1) << "Missing work directory";
-  }
+  const string sandboxDirectory = fetcherInfo.get().sandbox_directory();
 
-  Option<std::string> user = None();
-  if (fetcherInfo.get().has_user()) {
-    user = fetcherInfo.get().user();
-  }
+  const Option<string> cacheDirectory =
+    fetcherInfo.get().has_cache_directory() ?
+      Option<string>::some(fetcherInfo.get().cache_directory()) :
+        Option<string>::none();
 
-  Option<std::string> frameworksHome = None();
-  if (fetcherInfo.get().has_frameworks_home()) {
-    frameworksHome = fetcherInfo.get().frameworks_home();
-  }
+  const Option<string> frameworksHome =
+    fetcherInfo.get().has_frameworks_home() ?
+      Option<string>::some(fetcherInfo.get().frameworks_home()) :
+        Option<string>::none();
 
   // Fetch each URI to a local file, chmod, then chown if a user is provided.
-  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
-    // Fetch the URI to a local file.
-    Try<string> fetched = fetch(uri.value(), directory, frameworksHome);
+  foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) {
+    Try<string> fetched =
+      fetch(item, cacheDirectory, sandboxDirectory, frameworksHome);
     if (fetched.isError()) {
-      EXIT(1) << "Failed to fetch: " << uri.value();
-    }
-
-    // Chmod the fetched URI if it's executable, else assume it's an archive
-    // that should be extracted.
-    if (uri.executable()) {
-      Try<Nothing> chmod = os::chmod(
-          fetched.get(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
-      if (chmod.isError()) {
-        EXIT(1) << "Failed to chmod " << fetched.get() << ": " << chmod.error();
-      }
-    } else if (uri.extract()) {
-      // TODO(idownes): Consider removing the archive once extracted.
-      // Try to extract the file if it's recognized as an archive.
-      Try<bool> extracted = extract(fetched.get(), directory);
-      if (extracted.isError()) {
-        EXIT(1) << "Failed to extract "
-                << fetched.get() << ":" << extracted.error();
-      }
+      EXIT(1) << "Failed to fetch '" << item.uri().value()
+              << "': " + fetched.error();
     } else {
-      LOG(INFO) << "Skipped extracting path '" << fetched.get() << "'";
+      LOG(INFO) << "Fetched '" << item.uri().value()
+                << "' to '" << fetched.get() << "'";
     }
+  }
 
-    // Recursively chown the directory if a user is provided.
-    if (user.isSome()) {
-      Try<Nothing> chowned = os::chown(user.get(), directory);
-      if (chowned.isError()) {
-        EXIT(1) << "Failed to chown " << directory << ": " << chowned.error();
-      }
+  // Recursively chown the sandbox directory if a user is provided.
+  if (fetcherInfo.get().has_user()) {
+    Try<Nothing> chowned = os::chown(
+        fetcherInfo.get().user(),
+        sandboxDirectory);
+    if (chowned.isError()) {
+      EXIT(1) << "Failed to chown " << sandboxDirectory
+              << ": " << chowned.error();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 206d439..84927e5 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -108,6 +108,9 @@ extern const Duration DOCKER_VERSION_WAIT_TIMEOUT;
 // Name of the default, CRAM-MD5 authenticatee.
 extern const std::string DEFAULT_AUTHENTICATEE;
 
+// Default maximum storage space to be used by the fetcher cache.
+const Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2);
+
 // If no pings received within this timeout, then the slave will
 // trigger a re-detection of the master to cause a re-registration.
 Duration MASTER_PING_TIMEOUT();

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index bd58f94..41e0b98 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -325,7 +325,8 @@ DockerContainerizerProcess::Container::create(
 
 
 Future<Nothing> DockerContainerizerProcess::fetch(
-    const ContainerID& containerId)
+    const ContainerID& containerId,
+    const SlaveID& slaveId)
 {
   CHECK(containers_.contains(containerId));
   Container* container = containers_[containerId];
@@ -335,6 +336,7 @@ Future<Nothing> DockerContainerizerProcess::fetch(
       container->command,
       container->directory,
       None(),
+      slaveId,
       flags);
 }
 
@@ -772,7 +774,7 @@ Future<bool> DockerContainerizerProcess::launch(
 
   if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) {
     // Launching task by forking a subprocess to run docker executor.
-    return container.get()->launch = fetch(containerId)
+    return container.get()->launch = fetch(containerId, slaveId)
       .then(defer(self(), [=]() { return pull(containerId); }))
       .then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))
       .then(defer(self(), [=](pid_t pid) {
@@ -794,7 +796,7 @@ Future<bool> DockerContainerizerProcess::launch(
   // is running in a container (via docker_mesos_image flag)
   // we want the executor to keep running when the slave container
   // dies.
-  return container.get()->launch = fetch(containerId)
+  return container.get()->launch = fetch(containerId, slaveId)
     .then(defer(self(), [=]() { return pull(containerId); }))
     .then(defer(self(), [=]() {
       return launchExecutorContainer(containerId, containerName);
@@ -936,7 +938,6 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess(
 }
 
 
-
 Future<pid_t> DockerContainerizerProcess::checkpointExecutor(
     const ContainerID& containerId,
     const Docker::Container& dockerContainer)

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index d2cca4b..395d535 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -152,7 +152,9 @@ public:
       const ContainerID& containerId,
       bool killed = true); // process is either killed or reaped.
 
-  virtual process::Future<Nothing> fetch(const ContainerID& containerId);
+  virtual process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const SlaveID& slaveId);
 
   virtual process::Future<Nothing> pull(const ContainerID& containerId);
 


[2/4] mesos git commit: Added a cache to the Fetcher.

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index 9e9e9d0..c519bff 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,27 +16,40 @@
  * limitations under the License.
  */
 
-#include <mesos/fetcher/fetcher.hpp>
+#include <unordered_map>
 
+#include <process/async.hpp>
+#include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/dispatch.hpp>
-#include <process/process.hpp>
+
+#include <stout/net.hpp>
+#include <stout/path.hpp>
+
+#include "hdfs/hdfs.hpp"
 
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
 
+using std::list;
 using std::map;
+using std::shared_ptr;
 using std::string;
+using std::transform;
 using std::vector;
 
 using process::Future;
 
-using mesos::fetcher::FetcherInfo;
-
 namespace mesos {
 namespace internal {
 namespace slave {
 
+static const string FILE_URI_PREFIX = "file://";
+static const string FILE_URI_LOCALHOST = "file://localhost";
+
+static const string CACHE_FILE_NAME_PREFIX = "c";
+
 
 Fetcher::Fetcher() : process(new FetcherProcess())
 {
@@ -44,6 +57,13 @@ Fetcher::Fetcher() : process(new FetcherProcess())
 }
 
 
+Fetcher::Fetcher(const process::Owned<FetcherProcess>& process)
+  : process(process)
+{
+  spawn(process.get());
+}
+
+
 Fetcher::~Fetcher()
 {
   terminate(process.get());
@@ -51,68 +71,144 @@ Fetcher::~Fetcher()
 }
 
 
-map<string, string> Fetcher::environment(
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags)
+Try<Nothing> Fetcher::recover(const SlaveID& slaveId, const Flags& flags)
 {
-  FetcherInfo fetcherInfo;
+  // Good enough for now, simple, least-effort recovery.
+  VLOG(1) << "Clearing fetcher cache";
+
+  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+  Result<string> path = os::realpath(cacheDirectory);
+  if (path.isError()) {
+    LOG(ERROR) << "Malformed fetcher cache directory path '" << cacheDirectory
+               << "', error: " + path.error();
 
-  fetcherInfo.mutable_command_info()->CopyFrom(commandInfo);
+    return Error(path.error());
+  }
 
-  fetcherInfo.set_work_directory(directory);
+  if (path.isSome() && os::exists(path.get())) {
+    Try<Nothing> rmdir = os::rmdir(path.get(), true);
+    if (rmdir.isError()) {
+      LOG(ERROR) << "Could not delete fetcher cache directory '"
+                 << cacheDirectory << "', error: " + rmdir.error();
 
-  if (user.isSome()) {
-    fetcherInfo.set_user(user.get());
+      return rmdir;
+    }
   }
 
-  if (!flags.frameworks_home.empty()) {
-    fetcherInfo.set_frameworks_home(flags.frameworks_home);
+  return Nothing();
+}
+
+
+Try<string> Fetcher::basename(const string& uri)
+{
+  // TODO(bernd-mesos): full URI parsing, then move this to stout.
+  // There is a bug (or is it a feature?) in the original fetcher
+  // code without caching that remains in effect here. URIs are
+  // treated like file paths, looking for occurrences of "/",
+  // but ignoring other separators that can show up
+  // (e.g. "?", "=" in HTTP URLs).
+
+  if (uri.find_first_of('\\') != string::npos ||
+      uri.find_first_of('\'') != string::npos ||
+      uri.find_first_of('\0') != string::npos) {
+      return Error("Illegal characters in URI");
   }
 
-  map<string, string> result;
+  size_t index = uri.find("://");
+  if (index != string::npos && 1 < index) {
+    // URI starts with protocol specifier, e.g., http://, https://,
+    // ftp://, ftps://, hdfs://, hftp://, s3://, s3n://.
 
-  if (!flags.hadoop_home.empty()) {
-    result["HADOOP_HOME"] = flags.hadoop_home;
+    string path = uri.substr(index + 3);
+    if (!strings::contains(path, "/") || path.size() <= path.find("/") + 1) {
+      return Error("Malformed URI (missing path): " + uri);
+    }
+
+    return path.substr(path.find_last_of("/") + 1);
   }
+  return os::basename(uri);
+}
 
-  result["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(fetcherInfo));
 
-  return result;
+Try<Nothing> Fetcher::validateUri(const string& uri)
+{
+  Try<string> result = basename(uri);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return Nothing();
 }
 
 
-Future<Nothing> Fetcher::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+static Try<Nothing> validateUris(const CommandInfo& commandInfo)
 {
-  if (commandInfo.uris().size() == 0) {
-    return Nothing();
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    Try<Nothing> validation = Fetcher::validateUri(uri.value());
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
   }
 
-  return dispatch(process.get(),
-                  &FetcherProcess::fetch,
-                  containerId,
-                  commandInfo,
-                  directory,
-                  user,
-                  flags,
-                  stdout,
-                  stderr);
+  return Nothing();
+}
+
+
+Result<string> Fetcher::uriToLocalPath(
+    const string& uri,
+    const Option<string>& frameworksHome)
+{
+  if (!strings::startsWith(uri, "file://") && strings::contains(uri, "://")) {
+    return None();
+  }
+
+  string path = uri;
+  bool fileUri = false;
+
+  if (strings::startsWith(path, FILE_URI_LOCALHOST)) {
+    path = path.substr(FILE_URI_LOCALHOST.size());
+    fileUri = true;
+  } else if (strings::startsWith(path, FILE_URI_PREFIX)) {
+    path = path.substr(FILE_URI_PREFIX.size());
+    fileUri = true;
+  }
+
+  if (fileUri && !strings::startsWith(path, "/")) {
+    return Error("File URI only supports absolute paths");
+  }
+
+  if (path.find_first_of("/") != 0) {
+    if (frameworksHome.isNone() || frameworksHome.get().empty()) {
+      return Error("A relative path was passed for the resource but the "
+                   "Mesos framework home was not specified. "
+                   "Please either provide this config option "
+                   "or avoid using a relative path");
+    } else {
+      path = path::join(frameworksHome.get(), path);
+      LOG(INFO) << "Prepended Mesos frameworks home to relative path, "
+                << "making it: '" << path << "'";
+    }
+  }
+
+  return path;
+}
+
+
+bool Fetcher::isNetUri(const std::string& uri)
+{
+  return strings::startsWith(uri, "http://")  ||
+         strings::startsWith(uri, "https://") ||
+         strings::startsWith(uri, "ftp://")   ||
+         strings::startsWith(uri, "ftps://");
 }
 
 
 Future<Nothing> Fetcher::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
-    const string& directory,
+    const string& sandboxDirectory,
     const Option<string>& user,
+    const SlaveID& slaveId,
     const Flags& flags)
 {
   if (commandInfo.uris().size() == 0) {
@@ -123,8 +219,9 @@ Future<Nothing> Fetcher::fetch(
                   &FetcherProcess::fetch,
                   containerId,
                   commandInfo,
-                  directory,
+                  sandboxDirectory,
                   user,
+                  slaveId,
                   flags);
 }
 
@@ -143,124 +240,459 @@ FetcherProcess::~FetcherProcess()
 }
 
 
-Future<Nothing> FetcherProcess::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+// Find out how large a potential download from the given URI is.
+static Try<Bytes> fetchSize(
+    const string& uri,
+    const Option<string>& frameworksHome)
 {
-  VLOG(1) << "Starting to fetch URIs for container: " << containerId
-        << ", directory: " << directory;
+  VLOG(1) << "Fetching size for URI: " << uri;
 
-  Try<Subprocess> subprocess =
-    run(commandInfo, directory, user, flags, stdout, stderr);
+  Result<string> path = Fetcher::uriToLocalPath(uri, frameworksHome);
+  if (path.isError()) {
+    return Error(path.error());
+  }
+  if (path.isSome()) {
+    Try<Bytes> size = os::stat::size(path.get(), os::stat::FOLLOW_SYMLINK);
+    if (size.isError()) {
+      return Error("Could not determine file size for: '" + path.get() +
+                     "', error: " + size.error());
+    }
+    return size.get();
+  }
 
-  if (subprocess.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  if (Fetcher::isNetUri(uri)) {
+    Try<Bytes> size = net::contentLength(uri);
+    if (size.isError()) {
+      return Error(size.error());
+    }
+    if (size.get() == 0) {
+      return Error("URI reported content-length 0: " + uri);
+    }
+
+    return size.get();
   }
 
-  subprocessPids[containerId] = subprocess.get().pid();
+  HDFS hdfs;
 
-  return subprocess.get().status()
-    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+  Try<bool> available = hdfs.available();
+  if (available.isError() || !available.get()) {
+    return Error("Hadoop client not available: " + available.error());
+  }
+
+  Try<Bytes> size = hdfs.du(uri);
+  if (size.isError()) {
+    return Error("Hadoop client could not determine size: " + size.error());
+  }
+
+  return size.get();
 }
 
 
 Future<Nothing> FetcherProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
-    const string& directory,
+    const string& sandboxDirectory,
     const Option<string>& user,
+    const SlaveID& slaveId,
     const Flags& flags)
 {
   VLOG(1) << "Starting to fetch URIs for container: " << containerId
-        << ", directory: " << directory;
+          << ", directory: " << sandboxDirectory;
+
+  // TODO(bernd-mesos): This will disappear once we inject flags at
+  // Fetcher/FetcherProcess creation time. For now we trust this is
+  // always the exact same value.
+  cache.setSpace(flags.fetcher_cache_size);
+
+  Try<Nothing> validated = validateUris(commandInfo);
+  if (validated.isError()) {
+    return Failure("Could not fetch: " + validated.error());
+  }
+
+  Option<string> commandUser = user;
+  if (commandInfo.has_user()) {
+    commandUser = commandInfo.user();
+  }
+
+  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+  if (commandUser.isSome()) {
+    // Segregating per-user cache directories.
+    cacheDirectory = path::join(cacheDirectory, commandUser.get());
+  }
 
-  Try<Subprocess> subprocess = run(commandInfo, directory, user, flags);
+  if (commandUser.isSome()) {
+    // First assure that we are working for a valid user.
+    // TODO(bernd-mesos): This should be asynchronous.
+    Try<Nothing> chown = os::chown(commandUser.get(), sandboxDirectory);
+    if (chown.isError()) {
+      return Failure("Failed to chown directory: " + sandboxDirectory +
+                     " to user: " + commandUser.get() +
+                     " with error: " + chown.error());
+    }
+  }
 
-  if (subprocess.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + subprocess.error());
+  // For each URI we determine if we should use the cache and if so we
+  // try and either get the cache entry or create a cache entry. If
+  // we're getting the cache entry then we might need to wait for that
+  // cache entry to be downloaded. If we're creating a new cache entry
+  // then we need to properly reserve the cache space (and perform any
+  // evictions). Thus, there are three possibilities for each URI:
+  //
+  //   (1) We are not using the cache.
+  //   (2) We are using the cache but need to wait for an entry to be
+  //       downloaded.
+  //   (3) We are using the cache and need to create a new entry.
+  //
+  // We capture whether or not we're using the cache using an Option
+  // as a value in a map, i.e., if we are not trying to use the cache
+  // as in (1) above then the Option is None otherwise as in (2) and
+  // (3) the Option is Some. And to capture the asynchronous nature of
+  // both (2) and (3) that Option holds a Future to the actual cache
+  // entry.
+  hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>> entries;
+
+  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+    if (!uri.cache()) {
+      entries[uri] = None();
+      continue;
+    }
+
+    // Check if this is already in the cache (but not necessarily
+    // downloaded).
+    const Option<shared_ptr<Cache::Entry>> entry =
+      cache.get(commandUser, uri.value());
+
+    if (entry.isSome()) {
+      entry.get()->reference();
+
+      // Wait for the URI to be downloaded into the cache (or fail)
+      entries[uri] = entry.get()->completion()
+        .then(defer(self(), [=]() {
+          return Future<shared_ptr<Cache::Entry>>(entry.get());
+        }));
+    } else {
+      shared_ptr<Cache::Entry> newEntry =
+        cache.create(cacheDirectory, commandUser, uri);
+
+      newEntry->reference();
+
+      entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
+        .then(defer(self(),
+                    &FetcherProcess::reserveCacheSpace,
+                    lambda::_1,
+                    newEntry));
+    }
   }
 
-  subprocessPids[containerId] = subprocess.get().pid();
+  list<Future<shared_ptr<Cache::Entry>>> futures;
 
-  return subprocess.get().status()
-    .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
+  // Get out all of the futures we need to wait for so we can wait on them
+  // together via 'await'.
+  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                entries) {
+    if (entry.isSome()) {
+      futures.push_back(entry.get());
+    }
+  }
+
+  return _fetch(futures,
+                entries,
+                containerId,
+                sandboxDirectory,
+                cacheDirectory,
+                commandUser,
+                flags);
 }
 
 
 Future<Nothing> FetcherProcess::_fetch(
+    const list<Future<shared_ptr<Cache::Entry>>> futures,
+    const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
+      entries,
     const ContainerID& containerId,
-    const Option<int>& status)
+    const string& sandboxDirectory,
+    const string& cacheDirectory,
+    const Option<string>& user,
+    const Flags& flags)
 {
-  subprocessPids.erase(containerId);
+  return await(futures)
+    .then(defer(self(),
+                &FetcherProcess::__fetch,
+                entries))
+    .then(defer(self(),
+                &FetcherProcess::___fetch,
+                lambda::_1,
+                containerId,
+                sandboxDirectory,
+                cacheDirectory,
+                user,
+                flags));
+}
+
 
-  if (status.isNone()) {
-    return Failure("No status available from fetcher");
-  } else if (status.get() != 0) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "'with exit status: " +
-                   stringify(status.get()));
+// For each URI, if there is a cache entry and waiting for it was
+// successful, extract it and add it to the resulting map. Otherwise
+// we'll assume we are not using or cannot use the cache for this URI.
+Future<hashmap<CommandInfo::URI,
+               Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
+FetcherProcess::__fetch(
+    const hashmap<CommandInfo::URI,
+                  Option<Future<shared_ptr<Cache::Entry>>>>& entries)
+{
+  hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+  foreachpair (const CommandInfo::URI& uri,
+               const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+               entries) {
+    if (entry.isSome()) {
+      if (entry.get().isReady()) {
+        result[uri] = entry.get().get();
+      } else {
+        LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
+                     << uri.value()
+                     << "', due to failure to fetch through the cache, "
+                     << "with error: " << entry.get().failure();
+
+        result[uri] = None();
+      }
+    } else {
+      // No entry means bypassing the cache.
+      result[uri] = None();
+    }
   }
 
-  return Nothing();
+  return result;
 }
 
 
-Try<Subprocess> FetcherProcess::run(
-    const CommandInfo& commandInfo,
-    const string& directory,
+Future<Nothing> FetcherProcess::___fetch(
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
+    const ContainerID& containerId,
+    const string& sandboxDirectory,
+    const string& cacheDirectory,
     const Option<string>& user,
-    const Flags& flags,
-    const Option<int>& stdout,
-    const Option<int>& stderr)
+    const Flags& flags)
 {
-  // Determine path for mesos-fetcher.
-  Result<string> realpath = os::realpath(
-      path::join(flags.launcher_dir, "mesos-fetcher"));
+  // Now construct the FetcherInfo based on which URIs we're using
+  // the cache for and which ones we are bypassing the cache.
+  FetcherInfo info;
+
+  foreachpair (const CommandInfo::URI& uri,
+               const Option<shared_ptr<Cache::Entry>>& entry,
+               entries) {
+    FetcherInfo::Item* item = info.add_items();
+
+    item->mutable_uri()->CopyFrom(uri);
+
+    if (entry.isSome()) {
+      if (entry.get()->completion().isPending()) {
+        // Since the entry is not yet "complete", i.e.,
+        // 'completion().isPending()', it must be the case that we created
+        // the entry in FetcherProcess::fetch(). Otherwise the entry should
+        // have been in the cache already and we would have waited for its
+        // completion in FetcherProcess::fetch().
+        item->set_action(FetcherInfo::Item::DOWNLOAD_AND_CACHE);
+        item->set_cache_filename(entry.get()->filename);
+      } else {
+        CHECK_READY(entry.get()->completion());
+        item->set_action(FetcherInfo::Item::RETRIEVE_FROM_CACHE);
+        item->set_cache_filename(entry.get()->filename);
+      }
+    } else {
+      item->set_action(FetcherInfo::Item::BYPASS_CACHE);
+    }
+  }
 
-  if (!realpath.isSome()) {
-    LOG(ERROR) << "Failed to determine the canonical path "
-                << "for the mesos-fetcher '"
-                << path::join(flags.launcher_dir, "mesos-fetcher")
-                << "': "
-                << (realpath.isError() ? realpath.error()
-                                       : "No such file or directory");
-    return Error("Could not fetch URIs: failed to find mesos-fetcher");
+  info.set_sandbox_directory(sandboxDirectory);
+  info.set_cache_directory(cacheDirectory);
+
+  if (user.isSome()) {
+    info.set_user(user.get());
   }
 
-  // Now the actual mesos-fetcher command.
-  string command = realpath.get();
+  if (!flags.frameworks_home.empty()) {
+    info.set_frameworks_home(flags.frameworks_home);
+  }
 
-  LOG(INFO) << "Fetching URIs using command '" << command << "'";
+  return run(containerId, info, flags)
+    .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
+    .then(defer(self(), &FetcherProcess::__runSucceed, entries));
+}
 
-  Try<Subprocess> fetcherSubprocess = subprocess(
-    command,
-    Subprocess::PIPE(),
-    stdout.isSome()
-      ? Subprocess::FD(stdout.get())
-      : Subprocess::PIPE(),
-    stderr.isSome()
-      ? Subprocess::FD(stderr.get())
-      : Subprocess::PIPE(),
-    Fetcher::environment(commandInfo, directory, user, flags));
 
-  if (fetcherSubprocess.isError()) {
-    return Error(
-        "Failed to execute mesos-fetcher: " +  fetcherSubprocess.error());
+Future<Nothing> FetcherProcess::__runFail(
+    const Future<Nothing>& future,
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+  LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+    if (entry.isSome()) {
+      entry.get()->unreference();
+
+      if (entry.get()->completion().isPending()) {
+        // Unsuccessfully (or partially) downloaded! Remove from the cache.
+        entry.get()->fail();
+        cache.remove(entry.get()); // Might delete partial download.
+      }
+    }
   }
 
-  return fetcherSubprocess;
+  return future; // Always propagate the failure!
 }
 
 
-Try<Subprocess> FetcherProcess::run(
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user,
+Future<Nothing> FetcherProcess::__runSucceed(
+    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
+{
+  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+    if (entry.isSome()) {
+      entry.get()->unreference();
+
+      if (entry.get()->completion().isPending()) {
+        // Successfully downloaded and cached!
+
+        Try<Nothing> adjust = cache.adjust(entry.get());
+        if (adjust.isSome()) {
+          entry.get()->complete();
+        } else {
+          LOG(WARNING) << "Failed to adjust the cache size for entry '"
+                       << entry.get()->key << "' with error: "
+                       << adjust.error();
+
+          // Successfully fetched, but not reusable from the cache,
+          // because we are deleting the entry now.
+          entry.get()->fail();
+          cache.remove(entry.get());
+        }
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+static off_t delta(
+    const Bytes& actualSize,
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  if (actualSize < entry->size) {
+    Bytes delta = entry->size - actualSize;
+    LOG(WARNING) << "URI download result for '" << entry->key
+                 << "' is smaller than expected by " << stringify(delta)
+                 << " at: " << entry->path();
+
+    return -off_t(delta.bytes());
+  } else if (actualSize > entry->size) {
+    Bytes delta = actualSize - entry->size;
+    LOG(WARNING) << "URI download result for '" << entry->key
+                 << "' is larger than expected by " << stringify(delta)
+                 << " at: " << entry->path();
+
+    return off_t(delta.bytes());
+  }
+
+  return 0;
+}
+
+
+// For testing only.
+// TODO(bernd-mesos): After refactoring slave/containerizer,fetcher so
+// that flags and slave ID get injected, replace this with two functions
+// one of which returns a list of cache file paths, the other the number
+// of entries in the cache table.
+Try<list<Path>> FetcherProcess::cacheFiles(
+    const SlaveID& slaveId,
+    const Flags& flags)
+{
+  list<Path> result;
+
+  const string cacheDirectory =
+    slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+
+  if (!os::exists(cacheDirectory)) {
+    return result;
+  }
+
+  const Try<list<string>> find =
+    os::find(cacheDirectory, CACHE_FILE_NAME_PREFIX);
+
+  if (find.isError()) {
+    return Error("Could not access cache directory '" +
+                 cacheDirectory + "' with error: " + find.error());
+  }
+
+  transform(find.get().begin(),
+            find.get().end(),
+            std::back_inserter(result),
+            [](const string& path) { return Path(path); });
+
+  return result;
+}
+
+
+// For testing only.
+size_t FetcherProcess::cacheSize()
+{
+  return cache.size();
+}
+
+
+Bytes FetcherProcess::availableCacheSpace()
+{
+  return cache.availableSpace();
+}
+
+
+Future<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::reserveCacheSpace(
+    const Future<Try<Bytes>>& requestedSpace,
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  CHECK_READY(requestedSpace);
+
+  if (requestedSpace.get().isError()) {
+    // Let anyone waiting on this future know that we've
+    // failed to download and they should bypass the cache
+    // (any new requests will try again).
+    entry->fail();
+    cache.remove(entry);
+
+    return Failure("Could not determine size of cache file for '" +
+                   entry->key + "' with error: " +
+                   requestedSpace.get().error());
+  }
+
+  Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+
+  if (reservation.isError()) {
+    // Let anyone waiting on this future know that we've
+    // failed to download and they should bypass the cache
+    // (any new requests will try again).
+    entry->fail();
+    cache.remove(entry);
+
+    return Failure("Failed to reserve space in the cache: " +
+                   reservation.error());
+  }
+
+  VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
+
+  cache.claimSpace(requestedSpace.get().get());
+
+  // NOTE: We must set the entry size only when are also claiming the
+  // space! Other functions rely on this dependency (see
+  // Cache::remove()).
+  entry->size = requestedSpace.get().get();
+
+  return entry;
+}
+
+
+Future<Nothing> FetcherProcess::run(
+    const ContainerID& containerId,
+    const FetcherInfo& info,
     const Flags& flags)
 {
   // Before we fetch let's make sure we create 'stdout' and 'stderr'
@@ -273,47 +705,92 @@ Try<Subprocess> FetcherProcess::run(
   // today is because we not only need to open the files but also
   // chown them.
   Try<int> out = os::open(
-      path::join(directory, "stdout"),
+      path::join(info.sandbox_directory(), "stdout"),
       O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
       S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
 
   if (out.isError()) {
-    return Error("Failed to create 'stdout' file: " + out.error());
+    return Failure("Failed to create 'stdout' file: " + out.error());
   }
 
   // Repeat for stderr.
   Try<int> err = os::open(
-      path::join(directory, "stderr"),
+      path::join(info.sandbox_directory(), "stderr"),
       O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK | O_CLOEXEC,
       S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
 
   if (err.isError()) {
     os::close(out.get());
-    return Error("Failed to create 'stderr' file: " + err.error());
+    return Failure("Failed to create 'stderr' file: " + err.error());
   }
 
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(user.get(), directory);
-    if (chown.isError()) {
-      os::close(out.get());
-      os::close(err.get());
-      return Error("Failed to chown work directory");
-    }
+  string fetcherPath = path::join(flags.launcher_dir, "mesos-fetcher");
+  Result<string> realpath = os::realpath(fetcherPath);
+
+  if (!realpath.isSome()) {
+    LOG(ERROR) << "Failed to determine the canonical path "
+               << "for the mesos-fetcher '"
+               << fetcherPath
+               << "': "
+               << (realpath.isError() ? realpath.error()
+                                      : "No such file or directory");
+
+    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
   }
 
-  Try<Subprocess> subprocess = run(
-      commandInfo,
-      directory,
-      user,
-      flags,
-      out.get(),
-      err.get());
+  // Now the actual mesos-fetcher command.
+  string command = realpath.get();
+
+  // We pass arguments to the fetcher program by means of an
+  // environment variable.
+  map<string, string> environment;
+
+  environment["MESOS_FETCHER_INFO"] = stringify(JSON::Protobuf(info));
 
-  subprocess.get().status()
-    .onAny(lambda::bind(&os::close, out.get()))
-    .onAny(lambda::bind(&os::close, err.get()));
+  if (!flags.hadoop_home.empty()) {
+    environment["HADOOP_HOME"] = flags.hadoop_home;
+  }
+
+  VLOG(1) << "Fetching URIs using command '" << command << "'";
+
+  Try<Subprocess> fetcherSubprocess = subprocess(
+      command,
+      Subprocess::PIPE(),
+      Subprocess::FD(out.get()),
+      Subprocess::FD(err.get()),
+      environment);
 
-  return subprocess;
+  if (fetcherSubprocess.isError()) {
+    return Failure("Failed to execute mesos-fetcher: " +
+                   fetcherSubprocess.error());
+  }
+
+  // Remember this PID in case we need to kill the subprocess. See kill().
+  // This value gets reset in __run().
+  subprocessPids[containerId] = fetcherSubprocess.get().pid();
+
+  return fetcherSubprocess.get().status()
+    .then(defer(self(), [=](const Option<int>& status) -> Future<Nothing> {
+      if (status.isNone()) {
+        return Failure("No status available from mesos-fetcher");
+      }
+
+      if (status.get() != 0) {
+        return Failure("Failed to fetch all URIs for container '" +
+                       stringify(containerId) +
+                       "' with exit status: " +
+                       stringify(status.get()));
+      }
+
+      return Nothing();
+    }))
+    .onAny(defer(self(), [=](const Future<Nothing>& result) {
+      // Clear the subprocess PID remembered from running mesos-fetcher.
+      subprocessPids.erase(containerId);
+
+      os::close(out.get());
+      os::close(err.get());
+    }));
 }
 
 
@@ -328,6 +805,348 @@ void FetcherProcess::kill(const ContainerID& containerId)
   }
 }
 
+
+string FetcherProcess::Cache::nextFilename(const CommandInfo::URI& uri)
+{
+  // Different URIs may have the same base name, so we need to
+  // segregate the download results. This can be done by separate
+  // directories or by different file names. We opt for the latter
+  // since there may be tighter limits on how many sub-directories a
+  // file system can bear than on how many files can be in a directory.
+
+  // We put a fixed prefix upfront before the serial number so we can
+  // later easily find cache files with os::find() to support testing.
+
+  // Why we keep the file extension here: When fetching from cache, if
+  // extraction is enabled, the extraction algorithm can look at the
+  // extension of the cache file the same way as it would at a
+  // download of the original URI, and external commands performing
+  // the extraction do not get confused by their source file
+  // missing an expected form of extension. This is included in the
+  // following.
+
+  // Just for human operators who want to take a look at the cache
+  // and relate cache files to URIs, we also add some of the URI's
+  // basename, but not too much so we do not exceed file name size
+  // limits.
+
+  Try<string> base = Fetcher::basename(uri.value());
+  CHECK_SOME(base);
+
+  string s = base.get();
+  if (s.size() > 20) {
+    // Grab only a prefix and a suffix, but for sure including the
+    // file extension.
+    s = s.substr(0, 10) + "_" + s.substr(s.size() - 10, string::npos);
+  }
+
+  ++filenameSerial;
+
+  return CACHE_FILE_NAME_PREFIX + stringify(filenameSerial) + "-" + s;
+}
+
+
+static string cacheKey(const Option<string>& user, const string& uri)
+{
+  return user.isNone() ? uri : user.get() + "@" + uri;
+}
+
+
+shared_ptr<FetcherProcess::Cache::Entry> FetcherProcess::Cache::create(
+    const string& cacheDirectory,
+    const Option<string>& user,
+    const CommandInfo::URI& uri)
+{
+  const string key = cacheKey(user, uri.value());
+  const string filename = nextFilename(uri);
+
+  auto entry = shared_ptr<Cache::Entry>(
+      new Cache::Entry(key, cacheDirectory, filename));
+
+  table.put(key, entry);
+
+  VLOG(1) << "Created cache entry '" << key << "' with file: " << filename;
+
+  return entry;
+}
+
+
+Option<shared_ptr<FetcherProcess::Cache::Entry>>
+FetcherProcess::Cache::get(
+    const Option<string>& user,
+    const string& uri)
+{
+  const string key = cacheKey(user, uri);
+
+  return table.get(key);
+}
+
+
+bool FetcherProcess::Cache::contains(
+    const Option<string>& user,
+    const string& uri)
+{
+  return get(user, uri).isSome();
+}
+
+
+bool FetcherProcess::Cache::contains(const shared_ptr<Cache::Entry>& entry)
+{
+  Option<shared_ptr<Cache::Entry>> found = table.get(entry->key);
+  if (found.isNone()) {
+    return false;
+  }
+
+  return found == entry;
+}
+
+
+// We are removing an entry if:
+//
+//   (1) We failed to determine its prospective cache file size.
+//   (2) We failed to download it when invoking the mesos-fetcher.
+//   (3) We're evicting it to make room for another entry.
+//
+// In (1) and (2) the contract is that we'll have failed the entry's
+// future before we call remove, so the entry's future should no
+// longer be pending.
+//
+// In (3) it should be the case that the future is no longer pending,
+// because we shouldn't be able to evict something if we're
+// currently downloading it, because it should have a non-zero
+// reference count and therefore the future must either be ready or
+// failed in which case this is just case (1) above.
+//
+// NOTE: It is not necessarily the case that this cache entry has
+// zero references because there might be some waiters on the
+// downloading of this entry which haven't been able to run and find
+// out that the downloading failed.
+//
+// We want to attempt to delete the file regardless of if it being
+// downloaded since it might have been downloaded partially! Deleting
+// this file should not be racing with any other downloading or
+// deleting because all calls into the cache are serialized by the
+// FetcherProcess and since this entry is already in the cache there
+// should not be any other conflicting entries or files representing
+// this entry. Furthermore every cache file has a unique name. Thus
+// no new download conflicts with the manipulation of any pre-existing
+// cache content.
+Try<Nothing> FetcherProcess::Cache::remove(
+    const shared_ptr<Cache::Entry>& entry)
+{
+  VLOG(1) << "Removing cache entry '" << entry->key
+          << "' with filename: " << entry->filename;
+
+  CHECK(!entry->completion().isPending());
+
+  CHECK(contains(entry));
+
+  table.erase(entry->key);
+
+  // We may or may not have started downloading. The download may or may
+  // not have been partial. In any case, clean up whatever is there.
+  if (os::exists(entry->path().value)) {
+    Try<Nothing> rm = os::rm(entry->path().value);
+    if (rm.isError()) {
+      return Error("Could not delete fetcher cache file '" +
+                   entry->path().value + "' with error: " + rm.error() +
+                   " for entry '" + entry->key +
+                   "', leaking cache space: " + stringify(entry->size));
+    }
+  }
+
+  // NOTE: There is an assumption that if and only if 'entry->size > 0'
+  // then we've claimed cache space for this entry! This currently only
+  // gets set in reserveCacheSpace().
+  if (entry->size > 0) {
+    releaseSpace(entry->size);
+
+    entry->size = 0;
+  }
+
+  return Nothing();
+}
+
+
+Try<list<shared_ptr<FetcherProcess::Cache::Entry>>>
+FetcherProcess::Cache::selectVictims(const Bytes& requiredSpace)
+{
+  // TODO(bernd-mesos): Implement more elaborate selection criteria
+  // (LRU/MRU, etc.).
+
+  list<shared_ptr<FetcherProcess::Cache::Entry>> result;
+
+  Bytes space = 0;
+
+  foreachvalue (const shared_ptr<Cache::Entry>& entry, table) {
+    if (!entry->isReferenced()) {
+      result.push_back(entry);
+
+      space += entry->size;
+      if (space >= requiredSpace) {
+        return result;
+      }
+    }
+  }
+
+  return Error("Could not find enough cache files to evict");
+}
+
+
+Try<Nothing> FetcherProcess::Cache::reserve(
+    const Bytes& requestedSpace)
+{
+  if (availableSpace() < requestedSpace) {
+    Bytes missingSpace = requestedSpace - availableSpace();
+
+    VLOG(1) << "Freeing up fetcher cache space for: " << missingSpace;
+
+    const Try<list<shared_ptr<Cache::Entry>>> victims =
+      selectVictims(missingSpace);
+
+    if (victims.isError()) {
+      return Error("Could not free up enough fetcher cache space");
+    }
+
+    foreach (const shared_ptr<Cache::Entry>& entry, victims.get()) {
+      Try<Nothing> removal = remove(entry);
+      if (removal.isError()) {
+        return Error(removal.error());
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> FetcherProcess::Cache::adjust(
+    const shared_ptr<FetcherProcess::Cache::Entry>& entry)
+{
+  CHECK(contains(entry));
+
+  Try<Bytes> size =
+    os::stat::size(entry.get()->path().value, os::stat::DO_NOT_FOLLOW_SYMLINK);
+
+  if (size.isSome()) {
+    off_t d = delta(size.get(), entry);
+    if (d <= 0) {
+      entry->size = size.get();
+
+      releaseSpace(Bytes(d));
+    } else {
+      return Error("More cache size now necessary, not adjusting " +
+                   entry->key);
+    }
+  } else {
+    // This should never be caused by Mesos itself, but cannot be excluded.
+    return Error("Fetcher cache file for '" + entry->key +
+                 "' disappeared from: " + entry->path().value);
+  }
+
+  return Nothing();
+}
+
+
+size_t FetcherProcess::Cache::size()
+{
+  return table.size();
+}
+
+
+void FetcherProcess::Cache::setSpace(const Bytes& bytes)
+{
+  if (space > 0) {
+    // Dynamic cache size changes not supported.
+    CHECK(space == bytes);
+  } else {
+    space = bytes;
+  }
+}
+
+
+void FetcherProcess::Cache::claimSpace(const Bytes& bytes)
+{
+  tally += bytes;
+
+  if (tally > space) {
+    // Used cache volume space exceeds the maximum amount set by
+    // flags.fetcher_cache_size. This may be tolerated temporarily,
+    // if there is sufficient physical space available. But it can
+    // otherwise cause unspecified system behavior at any moment.
+    LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+                 << ", exceeds total fetcher cache space: " << space;
+  }
+
+  VLOG(1) << "Claimed cache space: " << bytes << ", now using: " << tally;
+}
+
+
+void FetcherProcess::Cache::releaseSpace(const Bytes& bytes)
+{
+  CHECK(bytes <= tally) << "Attempt to release more cache space than in use - "
+                        << " requested: " << bytes << ", in use: " << tally;
+
+
+  tally -= bytes;
+
+  VLOG(1) << "Released cache space: " << bytes << ", now using: " << tally;
+}
+
+
+Bytes FetcherProcess::Cache::availableSpace()
+{
+  if (tally > space) {
+    LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
+                 << ", exceeds total fetcher cache space: " << space;
+    return 0;
+  }
+
+  return space - tally;
+}
+
+
+void FetcherProcess::Cache::Entry::complete()
+{
+  CHECK_PENDING(promise.future());
+
+  promise.set(Nothing());
+}
+
+
+Future<Nothing> FetcherProcess::Cache::Entry::completion()
+{
+  return promise.future();
+}
+
+
+void FetcherProcess::Cache::Entry::fail()
+{
+  CHECK_PENDING(promise.future());
+
+  promise.fail("Could not download to fetcher cache: " + key);
+}
+
+
+void FetcherProcess::Cache::Entry::reference()
+{
+  referenceCount++;
+}
+
+
+void FetcherProcess::Cache::Entry::unreference()
+{
+  CHECK(referenceCount > 0);
+
+  referenceCount--;
+}
+
+
+bool FetcherProcess::Cache::Entry::isReferenced()
+{
+  return referenceCount > 0;
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 1db0eaf..3b63711 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-#ifndef __SLAVE_FETCHER_HPP__
-#define __SLAVE_FETCHER_HPP__
+#ifndef __SLAVE_CONTAINERIZER_FETCHER_HPP__
+#define __SLAVE_CONTAINERIZER_FETCHER_HPP__
 
 #include <string>
-#include <vector>
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
+#include <process/id.hpp>
 #include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/subprocess.hpp>
@@ -32,6 +34,8 @@
 
 #include "slave/flags.hpp"
 
+using mesos::fetcher::FetcherInfo;
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -40,48 +44,64 @@ namespace slave {
 class FetcherProcess;
 
 // Argument passing to and invocation of the external fetcher program.
-// TODO(bernd-mesos) : Orchestration and synchronization of fetching
-// phases. Bookkeeping of executor files that are cached after
-// downloading from a URI by the fetcher program. Cache eviction.
-// There has to be exactly one fetcher with a distinct cache dir per
-// active slave. This means that the cache dir can only be fixed
-// after the slave ID has been determined by registration or recovery.
+// Bookkeeping of executor files that are cached after downloading from
+// a URI by the fetcher program. Cache eviction. There has to be exactly
+// one fetcher with a distinct cache directory per active slave. This
+// means that the cache directory can only be fixed after the slave ID
+// has been determined by registration or recovery. Downloads to cache
+// files are separated on a per-user basis. The cache must only be used
+// for URIs for which the expected download size can be determined and
+// trusted before downloading. If there is any problem using the cache
+// for any given URI, the fetch procedure automatically reverts to
+// fetching directly into the sandbox directory.
 class Fetcher
 {
 public:
-  // Builds the environment used to run mesos-fetcher. This
-  // environment contains one variable with the name
-  // "MESOS_FETCHER_INFO", and its value is a protobuf of type
-  // mesos::fetcher::FetcherInfo.
-  static std::map<std::string, std::string> environment(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const Flags& flags);
+  // Extracts the basename from a URI. For example, "d.txt" from
+  // "htpp://1.2.3.4:5050/a/b/c/d.txt". The current implementation
+  // only works for fairly regular-shaped URIs with a "/" and a proper
+  // file name at the end.
+  static Try<std::string> basename(const std::string& uri);
+
+  // Some checks to make sure using the URI value in shell commands
+  // is safe.
+  // TODO(benh): These should be pushed into the scheduler driver and
+  // reported to the user.
+  static Try<Nothing> validateUri(const std::string& uri);
+
+  // Determines if the given URI refers to a local file system path
+  // and prepends frameworksHome if it is a relative path. Fails if
+  // frameworksHome is empty and a local path is indicated.
+  static Result<std::string> uriToLocalPath(
+      const std::string& uri,
+      const Option<std::string>& frameworksHome);
+
+  static bool isNetUri(const std::string& uri);
 
   Fetcher();
 
+  // This is only public for tests.
+  Fetcher(const process::Owned<FetcherProcess>& process);
+
   virtual ~Fetcher();
 
-  // Download the URIs specified in the command info and place the
-  // resulting files into the given work directory. Chmod said files
-  // to the user if given.
-  process::Future<Nothing> fetch(
-      const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+  // TODO(bernd-mesos): Inject these parameters at Fetcher creation time.
+  // Then also inject the fetcher into the slave at creation time. Then
+  // it will be possible to make this an instance method instead of a
+  // static one for the slave to call during startup or recovery.
+  static Try<Nothing> recover(const SlaveID& slaveId, const Flags& flags);
 
-  // Same as above, but send stdout and stderr to the files 'stdout'
-  // and 'stderr' in the specified directory.
+  // Download the URIs specified in the command info and place the
+  // resulting files into the given sandbox directory. Chmod said files
+  // to the user if given. Send stdout and stderr output to files
+  // "stdout" and "stderr" in the given directory. Extract archives and/or
+  // use the cache if so instructed by the given CommandInfo::URI items.
   process::Future<Nothing> fetch(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
       const Option<std::string>& user,
+      const SlaveID& slaveId,
       const Flags& flags);
 
   // Best effort to kill the fetcher subprocess associated with the
@@ -96,58 +116,241 @@ private:
 class FetcherProcess : public process::Process<FetcherProcess>
 {
 public:
-  FetcherProcess() : ProcessBase("__fetcher__") {}
+  FetcherProcess() : ProcessBase(process::ID::generate("fetcher")) {}
 
   virtual ~FetcherProcess();
 
-  // Fetcher implementation.
   process::Future<Nothing> fetch(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
       const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+      const SlaveID& slaveId,
+      const Flags& flags);
 
-  process::Future<Nothing> fetch(
+  // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
+  // in the given directory, using these for trace output.
+  virtual process::Future<Nothing> run(
       const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
+      const FetcherInfo& info,
       const Flags& flags);
 
+  // Best effort attempt to kill the external mesos-fetcher process
+  // running on behalf of the given container ID, if any.
   void kill(const ContainerID& containerId);
 
-private:
-  // Check status and return an error if any.
-  process::Future<Nothing> _fetch(
+  // Representation of the fetcher cache and its contents. There is
+  // exactly one instance per instance of FetcherProcess. All methods
+  // of Cache are to be executed on the latter to ensure atomicity of
+  // cache operations.
+  class Cache
+  {
+  public:
+    class Entry
+    {
+    public:
+      Entry(
+          const std::string& key,
+          const std::string& directory,
+          const std::string& filename)
+        : key(key),
+          directory(directory),
+          filename(filename),
+          size(0),
+          referenceCount(0) {}
+
+      ~Entry() {}
+
+      // Marks this file's download as successful by setting its promise
+      // to the path of the file in the cache.
+      void complete();
+
+      // Indicates whether this file's download into the cache is
+      // successfully completed.
+      process::Future<Nothing> completion();
+
+      // Marks this download as failed, notifying concurrent fetch attempts
+      // waiting for this result, by setting the promise to failed.
+      void fail();
+
+      // While an entry is "referenced" it cannot be evicted from the
+      // cache.
+      void reference();
+      void unreference();
+      bool isReferenced();
+
+      // Returns the path in the filesystem where cache entry resides.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      Path path() { return Path(path::join(directory, filename)); }
+
+      // Uniquely identifies a user/URI combination.
+      const std::string key;
+
+      // Cache directory where this entry is stored.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      const std::string directory;
+
+      // The unique name of the file held in the cache on behalf of a
+      // URI.
+      const std::string filename;
+
+      // The expected size of the cache file. This field is set before
+      // downloading. If the actual size of the downloaded file is
+      // different a warning is logged and the field's value adjusted.
+      Bytes size;
+
+    private:
+      // Concurrent fetch attempts can reference the same entry multiple
+      // times.
+      unsigned long referenceCount;
+
+     // Indicates successful downloading to the cache.
+      process::Promise<Nothing> promise;
+    };
+
+    Cache() : space(0), tally(0), filenameSerial(0) {}
+    virtual ~Cache() {}
+
+    // Registers the maximum usable space in the cache directory.
+    // TODO(bernd-mesos): This method will disappear when injecting 'flags'
+    // into the fetcher instead of passing 'flags' around as parameter.
+    void setSpace(const Bytes& bytes);
+
+    void claimSpace(const Bytes& bytes);
+    void releaseSpace(const Bytes& bytes);
+    Bytes availableSpace();
+
+    // Invents a new, distinct base name for a cache file, using the same
+    // filename extension as the URI.
+    std::string nextFilename(const CommandInfo::URI& uri);
+
+    // Creates a new entry and inserts it into the cache table. Also
+    // sets its reference count to 1. Returns the entry.
+    std::shared_ptr<Entry> create(
+        const std::string& cacheDirectory,
+        const Option<std::string>& user,
+        const CommandInfo::URI& uri);
+
+    // Retrieves the cache entry indexed by the parameters, without
+    // changing its reference count.
+    Option<std::shared_ptr<Entry>> get(
+        const Option<std::string>& user,
+        const std::string& uri);
+
+    // Returns whether an entry for this user and URI is in the cache.
+    bool contains(const Option<std::string>& user, const std::string& uri);
+
+    // Returns whether this identical entry is in the cache.
+    bool contains(const std::shared_ptr<Cache::Entry>& entry);
+
+    // Completely deletes a cache entry and its file. Warns on failure.
+    // Virtual for mock testing.
+    virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
+
+    // Determines a list of cache entries to remove, respectively cache files
+    // to delete, so that at least the required amount of space would become
+    // available.
+    Try<std::list<std::shared_ptr<Cache::Entry>>>
+        selectVictims(const Bytes& requiredSpace);
+
+    // Ensures that there is the requested amount of space is available
+    // Evicts other files as necessary to make it so.
+    Try<Nothing> reserve(const Bytes& requestedSpace);
+
+    // Finds out if any predictions about cache file sizes have been
+    // inaccurate, logs this if so, and records the cache files' actual
+    // sizes and adjusts the cache's total amount of space in use.
+    Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
+
+    // Number of entries.
+    size_t size();
+
+  private:
+    // Maximum storable number of bytes in the cache directory.
+    Bytes space;
+
+    // How much space has been reserved to be occupied by cache files.
+    Bytes tally;
+
+    // Used to generate distinct cache file names simply by counting.
+    unsigned long filenameSerial;
+
+    // Maps keys (cache directory / URI combinations) to cache file
+    // entries.
+    hashmap<std::string, std::shared_ptr<Entry>> table;
+  };
+
+  // Public and virtual for mock testing.
+  virtual process::Future<Nothing> _fetch(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
       const ContainerID& containerId,
-      const Option<int>& status);
-
-  // Run the mesos-fetcher with custom output redirection. If
-  // 'stdout' and 'stderr' file descriptors are provided then respective
-  // output from the mesos-fetcher will be redirected to the file
-  // descriptors. The file descriptors are duplicated (via dup) because
-  // redirecting might still be occuring even after the mesos-fetcher has
-  // terminated since there still might be data to be read.
-  // This method is only "public" for test purposes.
-  Try<process::Subprocess> run(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
       const Option<std::string>& user,
-      const Flags& flags,
-      const Option<int>& stdout,
-      const Option<int>& stderr);
+      const Flags& flags);
 
-  // Run the mesos-fetcher, creating a "stdout" and "stderr" file
-  // in the given directory and using these for output.
-  Try<process::Subprocess> run(
-      const CommandInfo& commandInfo,
-      const std::string& directory,
+  // Returns a list of cache files on disk for the given slave
+  // (for all users combined). For testing.
+  // TODO(bernd-mesos): Remove the parameters after slave/containerizer
+  // refactoring for injection of these.
+  Try<std::list<Path>> cacheFiles(const SlaveID& slaveId, const Flags& flags);
+
+  // Returns the number of cache entries for the given slave (for all
+  // users combined). For testing.
+  size_t cacheSize();
+
+  // Returns the amount of remaining cache space that is not occupied
+  // by cache entries. For testing.
+  Bytes availableCacheSpace();
+
+private:
+  process::Future<hashmap<
+      CommandInfo::URI,
+      Option<std::shared_ptr<Cache::Entry>>>>
+  __fetch(const hashmap<
+      CommandInfo::URI,
+      Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
+
+  process::Future<Nothing> ___fetch(
+      const hashmap<CommandInfo::URI,
+      Option<std::shared_ptr<Cache::Entry>>>& entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
       const Option<std::string>& user,
       const Flags& flags);
 
+  process::Future<Nothing> _run(
+      const Option<int>& status,
+      const ContainerID& containerId);
+
+  void __run(const ContainerID& containerId, const int out, const int err);
+
+  process::Future<Nothing> __runFail(
+      const process::Future<Nothing>& future,
+      const hashmap<CommandInfo::URI,
+                    Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+  process::Future<Nothing> __runSucceed(
+      const hashmap<CommandInfo::URI,
+                    Option<std::shared_ptr<Cache::Entry>>>& entries);
+
+  // Calls Cache::reserve() and returns a ready entry future if successful,
+  // else Failure. Claims the space and assigns the entry's size to this
+  // amount if and only if successful.
+  process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
+      const process::Future<Try<Bytes>>& requestedSpace,
+      const std::shared_ptr<Cache::Entry>& entry);
+
+  Cache cache;
+
   hashmap<ContainerID, pid_t> subprocessPids;
 };
 
@@ -155,4 +358,4 @@ private:
 } // namespace internal {
 } // namespace mesos {
 
-#endif // __SLAVE_FETCHER_HPP__
+#endif // __SLAVE_CONTAINERIZER_FETCHER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 396e5fb..c363605 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -650,7 +650,8 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const string& directory,
-    const Option<string>& user)
+    const Option<string>& user,
+    const SlaveID& slaveId)
 {
   if (!containers_.contains(containerId)) {
     return Failure("Container is already destroyed");
@@ -661,6 +662,7 @@ Future<Nothing> MesosContainerizerProcess::fetch(
       commandInfo,
       directory,
       user,
+      slaveId,
       flags);
 }
 
@@ -785,7 +787,8 @@ Future<bool> MesosContainerizerProcess::_launch(
                 containerId,
                 executorInfo.command(),
                 directory,
-                user))
+                user,
+                slaveId))
     .then(defer(self(), &Self::exec, containerId, pipes[1]))
     .onAny(lambda::bind(&os::close, pipes[0]))
     .onAny(lambda::bind(&os::close, pipes[1]));

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 31f5051..3ac2387 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -186,7 +186,8 @@ private:
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
       const std::string& directory,
-      const Option<std::string>& user);
+      const Option<std::string>& user,
+      const SlaveID& slaveId);
 
   process::Future<bool> _launch(
       const ContainerID& containerId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7f2e1e8..ab87098 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -67,6 +67,23 @@ mesos::internal::slave::Flags::Flags()
       "Attributes of machine, in the form:\n"
       "rack:2 or 'rack:2;u:1'");
 
+  add(&Flags::fetcher_cache_size, "fetcher_cache_size",
+      "Size of the fetcher cache in Bytes.",
+      DEFAULT_FETCHER_CACHE_SIZE);
+
+  // By default the fetcher cache directory is held inside the work
+  // directory, so everything can be deleted or archived in one swoop,
+  // in particular during testing. However, a typical production
+  // scenario is to use a separate cache volume. First, it is not meant
+  // to be backed up. Second, you want to avoid that sandbox directories
+  // and the cache directory can interfere with each other in
+  // unpredictable ways by occupying shared space. So it is recommended
+  // to set the cache directory explicitly.
+  add(&Flags::fetcher_cache_dir, "fetcher_cache_dir",
+      "Parent directory for fetcher cache directories\n"
+      "(one subdirectory per slave).",
+      "/tmp/mesos/fetch");
+
   add(&Flags::work_dir,
       "work_dir",
       "Directory path to place framework work directories\n", "/tmp/mesos");

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index e84efc1..84dbb8a 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -44,6 +44,8 @@ public:
   std::string isolation;
   std::string default_role;
   Option<std::string> attributes;
+  Bytes fetcher_cache_size;
+  std::string fetcher_cache_dir;
   std::string work_dir;
   std::string launcher_dir;
   std::string hadoop_home; // TODO(benh): Make an Option.

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fdaaea4..271cb03 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -835,6 +835,14 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       LOG(INFO) << "Registered with master " << master.get()
                 << "; given slave ID " << slaveId;
 
+      // TODO(bernd-mesos): Make this an instance method call, see comment
+      // in "fetcher.hpp"".
+      Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
+      if (recovered.isError()) {
+          LOG(FATAL) << "Could not initialize fetcher cache: "
+                     << recovered.error();
+      }
+
       state = RUNNING;
 
       statusUpdateManager->resume(); // Resume status updates.
@@ -3758,7 +3766,6 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
 }
 
 
-
 Future<Nothing> Slave::recover(const Result<state::State>& state)
 {
   if (state.isError()) {
@@ -3830,6 +3837,13 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
       metrics.recovery_errors += slaveState.get().errors;
     }
 
+    // TODO(bernd-mesos): Make this an instance method call, see comment
+    // in "fetcher.hpp"".
+    Try<Nothing> recovered = Fetcher::recover(slaveState.get().id, flags);
+    if (recovered.isError()) {
+      return Failure(recovered.error());
+    }
+
     // Recover the frameworks.
     foreachvalue (const FrameworkState& frameworkState,
                   slaveState.get().frameworks) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 8d3e605..3a983c6 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -379,24 +379,28 @@ public:
       const Shared<Docker>& docker)
     : DockerContainerizerProcess(flags, fetcher, docker)
   {
-    EXPECT_CALL(*this, fetch(_))
+    EXPECT_CALL(*this, fetch(_, _))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
 
     EXPECT_CALL(*this, pull(_))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
   }
 
-  MOCK_METHOD1(
+  MOCK_METHOD2(
       fetch,
-      process::Future<Nothing>(const ContainerID& containerId));
+      process::Future<Nothing>(
+          const ContainerID& containerId,
+          const SlaveID& slaveId));
 
   MOCK_METHOD1(
       pull,
       process::Future<Nothing>(const ContainerID& containerId));
 
-  process::Future<Nothing> _fetch(const ContainerID& containerId)
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const SlaveID& slaveId)
   {
-    return DockerContainerizerProcess::fetch(containerId);
+    return DockerContainerizerProcess::fetch(containerId, slaveId);
   }
 
   process::Future<Nothing> _pull(const ContainerID& containerId)
@@ -2381,7 +2385,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
   Future<Nothing> fetch;
 
   // We want to pause the fetch call to simulate a long fetch time.
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(DoAll(FutureSatisfy(&fetch),
                     Return(promise.future())));
 
@@ -2486,7 +2490,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
       (Owned<DockerContainerizerProcess>(process)));
 
   Future<Nothing> fetch;
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(DoAll(FutureSatisfy(&fetch),
                     Return(Nothing())));
 
@@ -2754,7 +2758,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
                     Invoke(&dockerContainerizer,
                            &MockDockerContainerizer::_launch)));
 
-  EXPECT_CALL(*process, fetch(_))
+  EXPECT_CALL(*process, fetch(_, _))
     .WillOnce(Return(Failure("some error from fetch")));
 
   vector<TaskInfo> tasks;


[4/4] mesos git commit: Final comments from benh on Fetcher Cache.

Posted by be...@apache.org.
Final comments from benh on Fetcher Cache.

See https://reviews.apache.org/r/30774.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aede4ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aede4ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aede4ad

Branch: refs/heads/master
Commit: 7aede4ad40b500e6a9dc5ead4a718d415bf5d889
Parents: edd35b0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat May 23 18:13:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Jun 1 06:45:29 2015 -0700

----------------------------------------------------------------------
 src/slave/containerizer/fetcher.cpp | 241 +++++++++++++++----------------
 src/slave/containerizer/fetcher.hpp |  28 +---
 src/tests/fetcher_cache_tests.cpp   |  40 +++--
 src/tests/mesos.cpp                 |   4 +-
 src/tests/mesos.hpp                 |   6 +-
 5 files changed, 146 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index c519bff..d4f127a 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -376,27 +376,20 @@ Future<Nothing> FetcherProcess::fetch(
 
       newEntry->reference();
 
-      entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
-        .then(defer(self(),
-                    &FetcherProcess::reserveCacheSpace,
-                    lambda::_1,
-                    newEntry));
-    }
-  }
-
-  list<Future<shared_ptr<Cache::Entry>>> futures;
-
-  // Get out all of the futures we need to wait for so we can wait on them
-  // together via 'await'.
-  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
-                entries) {
-    if (entry.isSome()) {
-      futures.push_back(entry.get());
+      entries[uri] =
+        async([=]() {
+          return fetchSize(uri.value(), flags.frameworks_home);
+        })
+        .then(defer(self(), [=](const Try<Bytes>& requestedSpace) {
+          return reserveCacheSpace(requestedSpace, newEntry);
+        }));
     }
   }
 
-  return _fetch(futures,
-                entries,
+  // NOTE: We explicitly call the continuation '_fetch' even though it
+  // looks like we could easily inline it here because we want to be
+  // able to mock the function for testing! Don't remove this!
+  return _fetch(entries,
                 containerId,
                 sandboxDirectory,
                 cacheDirectory,
@@ -406,7 +399,6 @@ Future<Nothing> FetcherProcess::fetch(
 
 
 Future<Nothing> FetcherProcess::_fetch(
-    const list<Future<shared_ptr<Cache::Entry>>> futures,
     const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
       entries,
     const ContainerID& containerId,
@@ -415,57 +407,61 @@ Future<Nothing> FetcherProcess::_fetch(
     const Option<string>& user,
     const Flags& flags)
 {
-  return await(futures)
-    .then(defer(self(),
-                &FetcherProcess::__fetch,
-                entries))
-    .then(defer(self(),
-                &FetcherProcess::___fetch,
-                lambda::_1,
-                containerId,
-                sandboxDirectory,
-                cacheDirectory,
-                user,
-                flags));
-}
-
-
-// For each URI, if there is a cache entry and waiting for it was
-// successful, extract it and add it to the resulting map. Otherwise
-// we'll assume we are not using or cannot use the cache for this URI.
-Future<hashmap<CommandInfo::URI,
-               Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
-FetcherProcess::__fetch(
-    const hashmap<CommandInfo::URI,
-                  Option<Future<shared_ptr<Cache::Entry>>>>& entries)
-{
-  hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+  // Get out all of the futures we need to wait for so we can wait on
+  // them together via 'await'.
+  list<Future<shared_ptr<Cache::Entry>>> futures;
 
-  foreachpair (const CommandInfo::URI& uri,
-               const Option<Future<shared_ptr<Cache::Entry>>>& entry,
-               entries) {
+  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                entries) {
     if (entry.isSome()) {
-      if (entry.get().isReady()) {
-        result[uri] = entry.get().get();
-      } else {
-        LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
-                     << uri.value()
-                     << "', due to failure to fetch through the cache, "
-                     << "with error: " << entry.get().failure();
-
-        result[uri] = None();
-      }
-    } else {
-      // No entry means bypassing the cache.
-      result[uri] = None();
+      futures.push_back(entry.get());
     }
   }
 
-  return result;
+  return await(futures)
+    .then(defer(self(), [=]() {
+      // For each URI, if there is a potential cache entry and waiting
+      // for its associated future was successful, extract the entry
+      // from the future and store it in 'result'. Otherwise we assume
+      // we are not using or cannot use the cache for this URI.
+      hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+      foreachpair (const CommandInfo::URI& uri,
+                   const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                   entries) {
+        if (entry.isSome()) {
+          if (entry.get().isReady()) {
+            result[uri] = entry.get().get();
+          } else {
+            LOG(WARNING)
+              << "Reverting to fetching directly into the sandbox for '"
+              << uri.value()
+              << "', due to failure to fetch through the cache, "
+              << "with error: " << entry.get().failure();
+
+            result[uri] = None();
+          }
+        } else {
+          // No entry means bypassing the cache.
+          result[uri] = None();
+        }
+      }
+
+      // NOTE: While we could inline '__fetch' we've explicitly kept
+      // it as a separate function to minimize complexity. Like with
+      // '_fetch', this also enables this phase of the fetcher cache
+      // to easily be mocked for testing!
+      return __fetch(result,
+                     containerId,
+                     sandboxDirectory,
+                     cacheDirectory,
+                     user,
+                     flags);
+    }));
 }
 
 
-Future<Nothing> FetcherProcess::___fetch(
+Future<Nothing> FetcherProcess::__fetch(
     const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
     const ContainerID& containerId,
     const string& sandboxDirectory,
@@ -515,61 +511,50 @@ Future<Nothing> FetcherProcess::___fetch(
   }
 
   return run(containerId, info, flags)
-    .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
-    .then(defer(self(), &FetcherProcess::__runSucceed, entries));
-}
-
-
-Future<Nothing> FetcherProcess::__runFail(
-    const Future<Nothing>& future,
-    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
-  LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
-
-  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
-    if (entry.isSome()) {
-      entry.get()->unreference();
-
-      if (entry.get()->completion().isPending()) {
-        // Unsuccessfully (or partially) downloaded! Remove from the cache.
-        entry.get()->fail();
-        cache.remove(entry.get()); // Might delete partial download.
+    .repair(defer(self(), [=](const Future<Nothing>& future) {
+      LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+      foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+        if (entry.isSome()) {
+          entry.get()->unreference();
+
+          if (entry.get()->completion().isPending()) {
+            // Unsuccessfully (or partially) downloaded! Remove from the cache.
+            entry.get()->fail();
+            cache.remove(entry.get()); // Might delete partial download.
+          }
+        }
       }
-    }
-  }
-
-  return future; // Always propagate the failure!
-}
-
 
-Future<Nothing> FetcherProcess::__runSucceed(
-    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
-  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
-    if (entry.isSome()) {
-      entry.get()->unreference();
-
-      if (entry.get()->completion().isPending()) {
-        // Successfully downloaded and cached!
-
-        Try<Nothing> adjust = cache.adjust(entry.get());
-        if (adjust.isSome()) {
-          entry.get()->complete();
-        } else {
-          LOG(WARNING) << "Failed to adjust the cache size for entry '"
-                       << entry.get()->key << "' with error: "
-                       << adjust.error();
-
-          // Successfully fetched, but not reusable from the cache,
-          // because we are deleting the entry now.
-          entry.get()->fail();
-          cache.remove(entry.get());
+      return future; // Always propagate the failure!
+    }))
+    .then(defer(self(), [=]() {
+      foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+        if (entry.isSome()) {
+          entry.get()->unreference();
+
+          if (entry.get()->completion().isPending()) {
+            // Successfully downloaded and cached!
+
+            Try<Nothing> adjust = cache.adjust(entry.get());
+            if (adjust.isSome()) {
+              entry.get()->complete();
+            } else {
+              LOG(WARNING) << "Failed to adjust the cache size for entry '"
+                           << entry.get()->key << "' with error: "
+                           << adjust.error();
+
+              // Successfully fetched, but not reusable from the
+              // cache, because we are deleting the entry now.
+              entry.get()->fail();
+              cache.remove(entry.get());
+            }
+          }
         }
       }
-    }
-  }
 
-  return Nothing();
+      return Nothing();
+    }));
 }
 
 
@@ -647,12 +632,10 @@ Bytes FetcherProcess::availableCacheSpace()
 
 Future<shared_ptr<FetcherProcess::Cache::Entry>>
 FetcherProcess::reserveCacheSpace(
-    const Future<Try<Bytes>>& requestedSpace,
+    const Try<Bytes>& requestedSpace,
     const shared_ptr<FetcherProcess::Cache::Entry>& entry)
 {
-  CHECK_READY(requestedSpace);
-
-  if (requestedSpace.get().isError()) {
+  if (requestedSpace.isError()) {
     // Let anyone waiting on this future know that we've
     // failed to download and they should bypass the cache
     // (any new requests will try again).
@@ -661,10 +644,10 @@ FetcherProcess::reserveCacheSpace(
 
     return Failure("Could not determine size of cache file for '" +
                    entry->key + "' with error: " +
-                   requestedSpace.get().error());
+                   requestedSpace.error());
   }
 
-  Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+  Try<Nothing> reservation = cache.reserve(requestedSpace.get());
 
   if (reservation.isError()) {
     // Let anyone waiting on this future know that we've
@@ -679,12 +662,12 @@ FetcherProcess::reserveCacheSpace(
 
   VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
 
-  cache.claimSpace(requestedSpace.get().get());
+  cache.claimSpace(requestedSpace.get());
 
-  // NOTE: We must set the entry size only when are also claiming the
-  // space! Other functions rely on this dependency (see
+  // NOTE: We must set the entry size only when we are also claiming
+  // the space! Other functions rely on this dependency (see
   // Cache::remove()).
-  entry->size = requestedSpace.get().get();
+  entry->size = requestedSpace.get();
 
   return entry;
 }
@@ -735,6 +718,9 @@ Future<Nothing> FetcherProcess::run(
                << (realpath.isError() ? realpath.error()
                                       : "No such file or directory");
 
+    os::close(out.get());
+    os::close(err.get());
+
     return Failure("Could not fetch URIs: failed to find mesos-fetcher");
   }
 
@@ -761,12 +747,15 @@ Future<Nothing> FetcherProcess::run(
       environment);
 
   if (fetcherSubprocess.isError()) {
+    os::close(out.get());
+    os::close(err.get());
     return Failure("Failed to execute mesos-fetcher: " +
                    fetcherSubprocess.error());
   }
 
-  // Remember this PID in case we need to kill the subprocess. See kill().
-  // This value gets reset in __run().
+  // Remember this PID in case we need to kill the subprocess. See
+  // FetcherProcess::kill(). This value gets removed after we wait on
+  // the subprocess.
   subprocessPids[containerId] = fetcherSubprocess.get().pid();
 
   return fetcherSubprocess.get().status()
@@ -784,7 +773,7 @@ Future<Nothing> FetcherProcess::run(
 
       return Nothing();
     }))
-    .onAny(defer(self(), [=](const Future<Nothing>& result) {
+    .onAny(defer(self(), [=](const Future<Nothing>&) {
       // Clear the subprocess PID remembered from running mesos-fetcher.
       subprocessPids.erase(containerId);
 
@@ -1058,7 +1047,7 @@ void FetcherProcess::Cache::setSpace(const Bytes& bytes)
 {
   if (space > 0) {
     // Dynamic cache size changes not supported.
-    CHECK(space == bytes);
+    CHECK_EQ(space, bytes);
   } else {
     space = bytes;
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 3b63711..16553ff 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -284,8 +284,6 @@ public:
 
   // Public and virtual for mock testing.
   virtual process::Future<Nothing> _fetch(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -311,14 +309,7 @@ public:
   Bytes availableCacheSpace();
 
 private:
-  process::Future<hashmap<
-      CommandInfo::URI,
-      Option<std::shared_ptr<Cache::Entry>>>>
-  __fetch(const hashmap<
-      CommandInfo::URI,
-      Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
-
-  process::Future<Nothing> ___fetch(
+  process::Future<Nothing> __fetch(
       const hashmap<CommandInfo::URI,
       Option<std::shared_ptr<Cache::Entry>>>& entries,
       const ContainerID& containerId,
@@ -327,26 +318,11 @@ private:
       const Option<std::string>& user,
       const Flags& flags);
 
-  process::Future<Nothing> _run(
-      const Option<int>& status,
-      const ContainerID& containerId);
-
-  void __run(const ContainerID& containerId, const int out, const int err);
-
-  process::Future<Nothing> __runFail(
-      const process::Future<Nothing>& future,
-      const hashmap<CommandInfo::URI,
-                    Option<std::shared_ptr<Cache::Entry>>>& entries);
-
-  process::Future<Nothing> __runSucceed(
-      const hashmap<CommandInfo::URI,
-                    Option<std::shared_ptr<Cache::Entry>>>& entries);
-
   // Calls Cache::reserve() and returns a ready entry future if successful,
   // else Failure. Claims the space and assigns the entry's size to this
   // amount if and only if successful.
   process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
-      const process::Future<Try<Bytes>>& requestedSpace,
+      const Try<Bytes>& requestedSpace,
       const std::shared_ptr<Cache::Entry>& entry);
 
   Cache cache;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index 99777f8..bd53fc1 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -113,7 +113,10 @@ static const string ARCHIVED_COMMAND_SCRIPT =
 class FetcherCacheTest : public MesosTest
 {
 public:
-  struct Task {
+  // A helper struct that captures useful information for each of the
+  // tasks that we have launched to help test expectations.
+  struct Task
+  {
     Path runDirectory;
     Queue<TaskStatus> statusQueue;
   };
@@ -386,7 +389,7 @@ FetcherCacheTest::Task FetcherCacheTest::launchTask(
       offer.framework_id(),
       executorId));
 
-  return Task {path, taskStatusQueue};
+  return Task{path, taskStatusQueue};
 }
 
 
@@ -439,7 +442,7 @@ vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks(
   // When _fetch() is called, notify us by satisfying a promise that
   // a task has passed the code stretch in which it competes for cache
   // entries.
-  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _))
+  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _))
     .WillRepeatedly(
         DoAll(SatisfyOne(&fetchContentionWaypoints),
               Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
@@ -708,10 +711,17 @@ TEST_F(FetcherCacheTest, LocalCachedExtract)
 class FetcherCacheHttpTest : public FetcherCacheTest
 {
 public:
-  // A minimal HTTP server (not intended as an actor) just reusing what
-  // is already implemented somewhere to serve some HTTP requests for
-  // file downloads. Plus counting how many requests are made. Plus the
-  // ability to pause answering requests, stalling them.
+  // A minimal HTTP server (NOTE: not written as an actor, but this is
+  // deprecated, see below) just reusing what is already implemented
+  // somewhere to serve some HTTP requests for file downloads. Plus
+  // counting how many requests are made. Plus the ability to pause
+  // answering requests, stalling them.
+  //
+  // TODO(bernd-mesos): This class follows a dangerous style of mixing
+  // actors and non-actors, DO NOT REPLICATE. Ultimately we want to
+  // replace this with a generic HTTP server that can be used by other
+  // tests as well and enables things like pausing requests,
+  // manipulating requests, mocking, etc.
   class HttpServer : public Process<HttpServer>
   {
   public:
@@ -722,15 +732,11 @@ public:
     {
       provide(COMMAND_NAME, test->commandPath);
       provide(ARCHIVE_NAME, test->archivePath);
-
-      spawn(this);
     }
 
     string url()
     {
-      return "http://127.0.0.1:" +
-             stringify(self().address.port) +
-             "/" + self().id + "/";
+      return "http://" + stringify(self().address) + "/" + self().id + "/";
     }
 
     // Stalls the execution of HTTP requests inside visit().
@@ -746,6 +752,14 @@ public:
 
     virtual void visit(const HttpEvent& event)
     {
+      // TODO(bernd-mesos): Don't use locks here because we'll
+      // actually block libprocess threads which could cause a
+      // deadlock if we have a test with too many requests that we
+      // don't have enough threads to run other actors! Instead,
+      // consider asynchronously deferring the actual execution of
+      // this function via a Queue. This is currently non-trivial
+      // because we can't copy an HttpEvent so we're _forced_ to block
+      // the thread synchronously.
       std::lock_guard<std::mutex> lock(mutex);
 
       countRequests++;
@@ -776,12 +790,12 @@ public:
     std::mutex mutex;
   };
 
-
   virtual void SetUp()
   {
     FetcherCacheTest::SetUp();
 
     httpServer = new HttpServer(this);
+    spawn(httpServer);
   }
 
   virtual void TearDown()

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d7a3c06..830d362 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -450,7 +450,7 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 MockFetcherProcess::MockFetcherProcess()
 {
   // Set up default behaviors, calling the original methods.
-  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)).
+  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _)).
     WillRepeatedly(
         Invoke(this, &MockFetcherProcess::unmocked__fetch));
   EXPECT_CALL(*this, run(_, _, _)).
@@ -459,7 +459,6 @@ MockFetcherProcess::MockFetcherProcess()
 
 
 process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
-  const list<Future<shared_ptr<Cache::Entry>>> futures,
   const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
     entries,
   const ContainerID& containerId,
@@ -469,7 +468,6 @@ process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
   const slave::Flags& flags)
 {
   return slave::FetcherProcess::_fetch(
-      futures,
       entries,
       containerId,
       sandboxDirectory,

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index a1c6ae4..4fcf0b7 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -810,9 +810,7 @@ public:
 
   virtual ~MockFetcherProcess() {}
 
-  MOCK_METHOD7(_fetch, process::Future<Nothing>(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
+  MOCK_METHOD6(_fetch, process::Future<Nothing>(
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -824,8 +822,6 @@ public:
       const slave::Flags& flags));
 
   process::Future<Nothing> unmocked__fetch(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&