You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:46:02 UTC

[4/4] mesos git commit: Final comments from benh on Fetcher Cache.

Final comments from benh on Fetcher Cache.

See https://reviews.apache.org/r/30774.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aede4ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aede4ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aede4ad

Branch: refs/heads/master
Commit: 7aede4ad40b500e6a9dc5ead4a718d415bf5d889
Parents: edd35b0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat May 23 18:13:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Jun 1 06:45:29 2015 -0700

----------------------------------------------------------------------
 src/slave/containerizer/fetcher.cpp | 241 +++++++++++++++----------------
 src/slave/containerizer/fetcher.hpp |  28 +---
 src/tests/fetcher_cache_tests.cpp   |  40 +++--
 src/tests/mesos.cpp                 |   4 +-
 src/tests/mesos.hpp                 |   6 +-
 5 files changed, 146 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index c519bff..d4f127a 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -376,27 +376,20 @@ Future<Nothing> FetcherProcess::fetch(
 
       newEntry->reference();
 
-      entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
-        .then(defer(self(),
-                    &FetcherProcess::reserveCacheSpace,
-                    lambda::_1,
-                    newEntry));
-    }
-  }
-
-  list<Future<shared_ptr<Cache::Entry>>> futures;
-
-  // Get out all of the futures we need to wait for so we can wait on them
-  // together via 'await'.
-  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
-                entries) {
-    if (entry.isSome()) {
-      futures.push_back(entry.get());
+      entries[uri] =
+        async([=]() {
+          return fetchSize(uri.value(), flags.frameworks_home);
+        })
+        .then(defer(self(), [=](const Try<Bytes>& requestedSpace) {
+          return reserveCacheSpace(requestedSpace, newEntry);
+        }));
     }
   }
 
-  return _fetch(futures,
-                entries,
+  // NOTE: We explicitly call the continuation '_fetch' even though it
+  // looks like we could easily inline it here because we want to be
+  // able to mock the function for testing! Don't remove this!
+  return _fetch(entries,
                 containerId,
                 sandboxDirectory,
                 cacheDirectory,
@@ -406,7 +399,6 @@ Future<Nothing> FetcherProcess::fetch(
 
 
 Future<Nothing> FetcherProcess::_fetch(
-    const list<Future<shared_ptr<Cache::Entry>>> futures,
     const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
       entries,
     const ContainerID& containerId,
@@ -415,57 +407,61 @@ Future<Nothing> FetcherProcess::_fetch(
     const Option<string>& user,
     const Flags& flags)
 {
-  return await(futures)
-    .then(defer(self(),
-                &FetcherProcess::__fetch,
-                entries))
-    .then(defer(self(),
-                &FetcherProcess::___fetch,
-                lambda::_1,
-                containerId,
-                sandboxDirectory,
-                cacheDirectory,
-                user,
-                flags));
-}
-
-
-// For each URI, if there is a cache entry and waiting for it was
-// successful, extract it and add it to the resulting map. Otherwise
-// we'll assume we are not using or cannot use the cache for this URI.
-Future<hashmap<CommandInfo::URI,
-               Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
-FetcherProcess::__fetch(
-    const hashmap<CommandInfo::URI,
-                  Option<Future<shared_ptr<Cache::Entry>>>>& entries)
-{
-  hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+  // Get out all of the futures we need to wait for so we can wait on
+  // them together via 'await'.
+  list<Future<shared_ptr<Cache::Entry>>> futures;
 
-  foreachpair (const CommandInfo::URI& uri,
-               const Option<Future<shared_ptr<Cache::Entry>>>& entry,
-               entries) {
+  foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                entries) {
     if (entry.isSome()) {
-      if (entry.get().isReady()) {
-        result[uri] = entry.get().get();
-      } else {
-        LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
-                     << uri.value()
-                     << "', due to failure to fetch through the cache, "
-                     << "with error: " << entry.get().failure();
-
-        result[uri] = None();
-      }
-    } else {
-      // No entry means bypassing the cache.
-      result[uri] = None();
+      futures.push_back(entry.get());
     }
   }
 
-  return result;
+  return await(futures)
+    .then(defer(self(), [=]() {
+      // For each URI, if there is a potential cache entry and waiting
+      // for its associated future was successful, extract the entry
+      // from the future and store it in 'result'. Otherwise we assume
+      // we are not using or cannot use the cache for this URI.
+      hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+      foreachpair (const CommandInfo::URI& uri,
+                   const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+                   entries) {
+        if (entry.isSome()) {
+          if (entry.get().isReady()) {
+            result[uri] = entry.get().get();
+          } else {
+            LOG(WARNING)
+              << "Reverting to fetching directly into the sandbox for '"
+              << uri.value()
+              << "', due to failure to fetch through the cache, "
+              << "with error: " << entry.get().failure();
+
+            result[uri] = None();
+          }
+        } else {
+          // No entry means bypassing the cache.
+          result[uri] = None();
+        }
+      }
+
+      // NOTE: While we could inline '__fetch' we've explicitly kept
+      // it as a separate function to minimize complexity. Like with
+      // '_fetch', this also enables this phase of the fetcher cache
+      // to easily be mocked for testing!
+      return __fetch(result,
+                     containerId,
+                     sandboxDirectory,
+                     cacheDirectory,
+                     user,
+                     flags);
+    }));
 }
 
 
-Future<Nothing> FetcherProcess::___fetch(
+Future<Nothing> FetcherProcess::__fetch(
     const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
     const ContainerID& containerId,
     const string& sandboxDirectory,
@@ -515,61 +511,50 @@ Future<Nothing> FetcherProcess::___fetch(
   }
 
   return run(containerId, info, flags)
-    .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
-    .then(defer(self(), &FetcherProcess::__runSucceed, entries));
-}
-
-
-Future<Nothing> FetcherProcess::__runFail(
-    const Future<Nothing>& future,
-    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
-  LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
-
-  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
-    if (entry.isSome()) {
-      entry.get()->unreference();
-
-      if (entry.get()->completion().isPending()) {
-        // Unsuccessfully (or partially) downloaded! Remove from the cache.
-        entry.get()->fail();
-        cache.remove(entry.get()); // Might delete partial download.
+    .repair(defer(self(), [=](const Future<Nothing>& future) {
+      LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+      foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+        if (entry.isSome()) {
+          entry.get()->unreference();
+
+          if (entry.get()->completion().isPending()) {
+            // Unsuccessfully (or partially) downloaded! Remove from the cache.
+            entry.get()->fail();
+            cache.remove(entry.get()); // Might delete partial download.
+          }
+        }
       }
-    }
-  }
-
-  return future; // Always propagate the failure!
-}
-
 
-Future<Nothing> FetcherProcess::__runSucceed(
-    const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
-  foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
-    if (entry.isSome()) {
-      entry.get()->unreference();
-
-      if (entry.get()->completion().isPending()) {
-        // Successfully downloaded and cached!
-
-        Try<Nothing> adjust = cache.adjust(entry.get());
-        if (adjust.isSome()) {
-          entry.get()->complete();
-        } else {
-          LOG(WARNING) << "Failed to adjust the cache size for entry '"
-                       << entry.get()->key << "' with error: "
-                       << adjust.error();
-
-          // Successfully fetched, but not reusable from the cache,
-          // because we are deleting the entry now.
-          entry.get()->fail();
-          cache.remove(entry.get());
+      return future; // Always propagate the failure!
+    }))
+    .then(defer(self(), [=]() {
+      foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+        if (entry.isSome()) {
+          entry.get()->unreference();
+
+          if (entry.get()->completion().isPending()) {
+            // Successfully downloaded and cached!
+
+            Try<Nothing> adjust = cache.adjust(entry.get());
+            if (adjust.isSome()) {
+              entry.get()->complete();
+            } else {
+              LOG(WARNING) << "Failed to adjust the cache size for entry '"
+                           << entry.get()->key << "' with error: "
+                           << adjust.error();
+
+              // Successfully fetched, but not reusable from the
+              // cache, because we are deleting the entry now.
+              entry.get()->fail();
+              cache.remove(entry.get());
+            }
+          }
         }
       }
-    }
-  }
 
-  return Nothing();
+      return Nothing();
+    }));
 }
 
 
@@ -647,12 +632,10 @@ Bytes FetcherProcess::availableCacheSpace()
 
 Future<shared_ptr<FetcherProcess::Cache::Entry>>
 FetcherProcess::reserveCacheSpace(
-    const Future<Try<Bytes>>& requestedSpace,
+    const Try<Bytes>& requestedSpace,
     const shared_ptr<FetcherProcess::Cache::Entry>& entry)
 {
-  CHECK_READY(requestedSpace);
-
-  if (requestedSpace.get().isError()) {
+  if (requestedSpace.isError()) {
     // Let anyone waiting on this future know that we've
     // failed to download and they should bypass the cache
     // (any new requests will try again).
@@ -661,10 +644,10 @@ FetcherProcess::reserveCacheSpace(
 
     return Failure("Could not determine size of cache file for '" +
                    entry->key + "' with error: " +
-                   requestedSpace.get().error());
+                   requestedSpace.error());
   }
 
-  Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+  Try<Nothing> reservation = cache.reserve(requestedSpace.get());
 
   if (reservation.isError()) {
     // Let anyone waiting on this future know that we've
@@ -679,12 +662,12 @@ FetcherProcess::reserveCacheSpace(
 
   VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
 
-  cache.claimSpace(requestedSpace.get().get());
+  cache.claimSpace(requestedSpace.get());
 
-  // NOTE: We must set the entry size only when are also claiming the
-  // space! Other functions rely on this dependency (see
+  // NOTE: We must set the entry size only when we are also claiming
+  // the space! Other functions rely on this dependency (see
   // Cache::remove()).
-  entry->size = requestedSpace.get().get();
+  entry->size = requestedSpace.get();
 
   return entry;
 }
@@ -735,6 +718,9 @@ Future<Nothing> FetcherProcess::run(
                << (realpath.isError() ? realpath.error()
                                       : "No such file or directory");
 
+    os::close(out.get());
+    os::close(err.get());
+
     return Failure("Could not fetch URIs: failed to find mesos-fetcher");
   }
 
@@ -761,12 +747,15 @@ Future<Nothing> FetcherProcess::run(
       environment);
 
   if (fetcherSubprocess.isError()) {
+    os::close(out.get());
+    os::close(err.get());
     return Failure("Failed to execute mesos-fetcher: " +
                    fetcherSubprocess.error());
   }
 
-  // Remember this PID in case we need to kill the subprocess. See kill().
-  // This value gets reset in __run().
+  // Remember this PID in case we need to kill the subprocess. See
+  // FetcherProcess::kill(). This value gets removed after we wait on
+  // the subprocess.
   subprocessPids[containerId] = fetcherSubprocess.get().pid();
 
   return fetcherSubprocess.get().status()
@@ -784,7 +773,7 @@ Future<Nothing> FetcherProcess::run(
 
       return Nothing();
     }))
-    .onAny(defer(self(), [=](const Future<Nothing>& result) {
+    .onAny(defer(self(), [=](const Future<Nothing>&) {
       // Clear the subprocess PID remembered from running mesos-fetcher.
       subprocessPids.erase(containerId);
 
@@ -1058,7 +1047,7 @@ void FetcherProcess::Cache::setSpace(const Bytes& bytes)
 {
   if (space > 0) {
     // Dynamic cache size changes not supported.
-    CHECK(space == bytes);
+    CHECK_EQ(space, bytes);
   } else {
     space = bytes;
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 3b63711..16553ff 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -284,8 +284,6 @@ public:
 
   // Public and virtual for mock testing.
   virtual process::Future<Nothing> _fetch(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -311,14 +309,7 @@ public:
   Bytes availableCacheSpace();
 
 private:
-  process::Future<hashmap<
-      CommandInfo::URI,
-      Option<std::shared_ptr<Cache::Entry>>>>
-  __fetch(const hashmap<
-      CommandInfo::URI,
-      Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
-
-  process::Future<Nothing> ___fetch(
+  process::Future<Nothing> __fetch(
       const hashmap<CommandInfo::URI,
       Option<std::shared_ptr<Cache::Entry>>>& entries,
       const ContainerID& containerId,
@@ -327,26 +318,11 @@ private:
       const Option<std::string>& user,
       const Flags& flags);
 
-  process::Future<Nothing> _run(
-      const Option<int>& status,
-      const ContainerID& containerId);
-
-  void __run(const ContainerID& containerId, const int out, const int err);
-
-  process::Future<Nothing> __runFail(
-      const process::Future<Nothing>& future,
-      const hashmap<CommandInfo::URI,
-                    Option<std::shared_ptr<Cache::Entry>>>& entries);
-
-  process::Future<Nothing> __runSucceed(
-      const hashmap<CommandInfo::URI,
-                    Option<std::shared_ptr<Cache::Entry>>>& entries);
-
   // Calls Cache::reserve() and returns a ready entry future if successful,
   // else Failure. Claims the space and assigns the entry's size to this
   // amount if and only if successful.
   process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
-      const process::Future<Try<Bytes>>& requestedSpace,
+      const Try<Bytes>& requestedSpace,
       const std::shared_ptr<Cache::Entry>& entry);
 
   Cache cache;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index 99777f8..bd53fc1 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -113,7 +113,10 @@ static const string ARCHIVED_COMMAND_SCRIPT =
 class FetcherCacheTest : public MesosTest
 {
 public:
-  struct Task {
+  // A helper struct that captures useful information for each of the
+  // tasks that we have launched to help test expectations.
+  struct Task
+  {
     Path runDirectory;
     Queue<TaskStatus> statusQueue;
   };
@@ -386,7 +389,7 @@ FetcherCacheTest::Task FetcherCacheTest::launchTask(
       offer.framework_id(),
       executorId));
 
-  return Task {path, taskStatusQueue};
+  return Task{path, taskStatusQueue};
 }
 
 
@@ -439,7 +442,7 @@ vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks(
   // When _fetch() is called, notify us by satisfying a promise that
   // a task has passed the code stretch in which it competes for cache
   // entries.
-  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _))
+  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _))
     .WillRepeatedly(
         DoAll(SatisfyOne(&fetchContentionWaypoints),
               Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
@@ -708,10 +711,17 @@ TEST_F(FetcherCacheTest, LocalCachedExtract)
 class FetcherCacheHttpTest : public FetcherCacheTest
 {
 public:
-  // A minimal HTTP server (not intended as an actor) just reusing what
-  // is already implemented somewhere to serve some HTTP requests for
-  // file downloads. Plus counting how many requests are made. Plus the
-  // ability to pause answering requests, stalling them.
+  // A minimal HTTP server (NOTE: not written as an actor, but this is
+  // deprecated, see below) just reusing what is already implemented
+  // somewhere to serve some HTTP requests for file downloads. Plus
+  // counting how many requests are made. Plus the ability to pause
+  // answering requests, stalling them.
+  //
+  // TODO(bernd-mesos): This class follows a dangerous style of mixing
+  // actors and non-actors, DO NOT REPLICATE. Ultimately we want to
+  // replace this with a generic HTTP server that can be used by other
+  // tests as well and enables things like pausing requests,
+  // manipulating requests, mocking, etc.
   class HttpServer : public Process<HttpServer>
   {
   public:
@@ -722,15 +732,11 @@ public:
     {
       provide(COMMAND_NAME, test->commandPath);
       provide(ARCHIVE_NAME, test->archivePath);
-
-      spawn(this);
     }
 
     string url()
     {
-      return "http://127.0.0.1:" +
-             stringify(self().address.port) +
-             "/" + self().id + "/";
+      return "http://" + stringify(self().address) + "/" + self().id + "/";
     }
 
     // Stalls the execution of HTTP requests inside visit().
@@ -746,6 +752,14 @@ public:
 
     virtual void visit(const HttpEvent& event)
     {
+      // TODO(bernd-mesos): Don't use locks here because we'll
+      // actually block libprocess threads which could cause a
+      // deadlock if we have a test with too many requests that we
+      // don't have enough threads to run other actors! Instead,
+      // consider asynchronously deferring the actual execution of
+      // this function via a Queue. This is currently non-trivial
+      // because we can't copy an HttpEvent so we're _forced_ to block
+      // the thread synchronously.
       std::lock_guard<std::mutex> lock(mutex);
 
       countRequests++;
@@ -776,12 +790,12 @@ public:
     std::mutex mutex;
   };
 
-
   virtual void SetUp()
   {
     FetcherCacheTest::SetUp();
 
     httpServer = new HttpServer(this);
+    spawn(httpServer);
   }
 
   virtual void TearDown()

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d7a3c06..830d362 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -450,7 +450,7 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 MockFetcherProcess::MockFetcherProcess()
 {
   // Set up default behaviors, calling the original methods.
-  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)).
+  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _)).
     WillRepeatedly(
         Invoke(this, &MockFetcherProcess::unmocked__fetch));
   EXPECT_CALL(*this, run(_, _, _)).
@@ -459,7 +459,6 @@ MockFetcherProcess::MockFetcherProcess()
 
 
 process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
-  const list<Future<shared_ptr<Cache::Entry>>> futures,
   const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
     entries,
   const ContainerID& containerId,
@@ -469,7 +468,6 @@ process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
   const slave::Flags& flags)
 {
   return slave::FetcherProcess::_fetch(
-      futures,
       entries,
       containerId,
       sandboxDirectory,

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index a1c6ae4..4fcf0b7 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -810,9 +810,7 @@ public:
 
   virtual ~MockFetcherProcess() {}
 
-  MOCK_METHOD7(_fetch, process::Future<Nothing>(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
+  MOCK_METHOD6(_fetch, process::Future<Nothing>(
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -824,8 +822,6 @@ public:
       const slave::Flags& flags));
 
   process::Future<Nothing> unmocked__fetch(
-      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
-        futures,
       const hashmap<
           CommandInfo::URI,
           Option<process::Future<std::shared_ptr<Cache::Entry>>>>&