You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2017/05/26 01:40:57 UTC

[06/16] mesos git commit: Changed the fetcher cache directory.

Changed the fetcher cache directory.

The fetcher cache directory was historically located (by default)
in `/tmp/mesos/fetch`.  The agent flag `--fetcher_cache_dir` could
be used to change this value.

The fetcher would create a subdirectory underneath `/tmp/mesos/fetch`
for each `SlaveID`.  This was done because multiple agents can run on
the same node.  If all the agents use the same default fetcher cache
directory, they will collide and cause unpredictable results.
As a result, the `SlaveID` needed to be passed into the fetcher
after the agent recovers and/or registers with the master, because
that is when the `SlaveID` is determined.

This changes the default fetcher cache directory to
`/tmp/mesos/fetch`.  The `SlaveID` subdirectory has been removed.

This change, while techically a breaking change, is safe because of
how the fetcher uses this directory.  Upon starting up, the fetcher
"recovers" by clearing this directory.  Although the subdirectory
has been removed, the fetcher still clears the fetcher cache
on startup.

This change will only cause breakages if multiple agents are run
with the same `--fetcher_cache_dir`.  In this case, each agent
will delete the fetcher caches of all the other agents.

---

With the removal of the `SlaveID` field in the fetcher's methods,
it is no longer necessary to pass in the `SlaveID` or agent Flags
at agent recovery time.  Instead, the flags can be passed in during
the fetcher's construction.

Similarly, the fetcher's "recovery" (clearing the fetcher cache)
can be done immediately upon construction, which simplifies the
code slightly.

Review: https://reviews.apache.org/r/58900


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/145896bc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/145896bc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/145896bc

Branch: refs/heads/master
Commit: 145896bcf0dc0f2d53dd6836a99a0397e8ac13ae
Parents: 51da8c4
Author: Joseph Wu <jo...@apache.org>
Authored: Mon Apr 10 18:24:15 2017 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Thu May 25 18:37:06 2017 -0700

----------------------------------------------------------------------
 src/local/local.cpp                 |  6 ++-
 src/slave/containerizer/fetcher.cpp | 87 ++++++++------------------------
 src/slave/containerizer/fetcher.hpp | 37 +++++---------
 src/slave/flags.cpp                 | 17 +++----
 src/slave/main.cpp                  |  2 +-
 src/slave/slave.cpp                 | 15 ------
 6 files changed, 48 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 3f4150b..ad35723 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -372,6 +372,10 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     propagatedFlags["runtime_dir"] =
       path::join(flags.runtime_dir, "agents", stringify(i));
 
+    // Use a different fetcher cache directory for each agent.
+    propagatedFlags["fetcher_cache_dir"] =
+      path::join(os::temp(), "mesos", "fetch", "agents", stringify(i));
+
     slave::Flags slaveFlags;
     Try<flags::Warnings> load = slaveFlags.load(
         propagatedFlags,
@@ -407,7 +411,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
     garbageCollectors->push_back(new GarbageCollector());
     statusUpdateManagers->push_back(new StatusUpdateManager(slaveFlags));
-    fetchers->push_back(new Fetcher());
+    fetchers->push_back(new Fetcher(slaveFlags));
 
     Try<ResourceEstimator*> resourceEstimator =
       ResourceEstimator::create(slaveFlags.resource_estimator);

http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index a910fea..770cad3 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -36,8 +36,6 @@
 
 #include "hdfs/hdfs.hpp"
 
-#include "slave/slave.hpp"
-
 #include "slave/containerizer/fetcher.hpp"
 
 using std::list;
@@ -68,8 +66,15 @@ static const string FILE_URI_LOCALHOST = "file://localhost";
 static const string CACHE_FILE_NAME_PREFIX = "c";
 
 
-Fetcher::Fetcher() : process(new FetcherProcess())
+Fetcher::Fetcher(const Flags& flags) : process(new FetcherProcess(flags))
 {
+  if (os::exists(flags.fetcher_cache_dir)) {
+    Try<Nothing> rmdir = os::rmdir(flags.fetcher_cache_dir, true);
+    CHECK_SOME(rmdir)
+      << "Could not delete fetcher cache directory '"
+      << flags.fetcher_cache_dir << "': " + rmdir.error();
+  }
+
   spawn(process.get());
 }
 
@@ -88,34 +93,6 @@ Fetcher::~Fetcher()
 }
 
 
-Try<Nothing> Fetcher::recover(const SlaveID& slaveId, const Flags& flags)
-{
-  // Good enough for now, simple, least-effort recovery.
-  VLOG(1) << "Clearing fetcher cache";
-
-  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
-  Result<string> path = os::realpath(cacheDirectory);
-  if (path.isError()) {
-    LOG(ERROR) << "Malformed fetcher cache directory path '" << cacheDirectory
-               << "', error: " + path.error();
-
-    return Error(path.error());
-  }
-
-  if (path.isSome() && os::exists(path.get())) {
-    Try<Nothing> rmdir = os::rmdir(path.get(), true);
-    if (rmdir.isError()) {
-      LOG(ERROR) << "Could not delete fetcher cache directory '"
-                 << cacheDirectory << "', error: " + rmdir.error();
-
-      return rmdir;
-    }
-  }
-
-  return Nothing();
-}
-
-
 Try<string> Fetcher::basename(const string& uri)
 {
   // TODO(bernd-mesos): full URI parsing, then move this to stout.
@@ -258,18 +235,14 @@ Future<Nothing> Fetcher::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const string& sandboxDirectory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const Flags& flags)
+    const Option<string>& user)
 {
   return dispatch(process.get(),
                   &FetcherProcess::fetch,
                   containerId,
                   commandInfo,
                   sandboxDirectory,
-                  user,
-                  slaveId,
-                  flags);
+                  user);
 }
 
 
@@ -346,9 +319,7 @@ Future<Nothing> FetcherProcess::fetch(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const string& sandboxDirectory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const Flags& flags)
+    const Option<string>& user)
 {
   VLOG(1) << "Starting to fetch URIs for container: " << containerId
           << ", directory: " << sandboxDirectory;
@@ -368,7 +339,7 @@ Future<Nothing> FetcherProcess::fetch(
     commandUser = commandInfo.user();
   }
 
-  string cacheDirectory = paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+  string cacheDirectory = flags.fetcher_cache_dir;
   if (commandUser.isSome()) {
     // Segregating per-user cache directories.
     cacheDirectory = path::join(cacheDirectory, commandUser.get());
@@ -436,8 +407,7 @@ Future<Nothing> FetcherProcess::fetch(
                 containerId,
                 sandboxDirectory,
                 cacheDirectory,
-                commandUser,
-                flags);
+                commandUser);
 }
 
 
@@ -447,8 +417,7 @@ Future<Nothing> FetcherProcess::_fetch(
     const ContainerID& containerId,
     const string& sandboxDirectory,
     const string& cacheDirectory,
-    const Option<string>& user,
-    const Flags& flags)
+    const Option<string>& user)
 {
   // Get out all of the futures we need to wait for so we can wait on
   // them together via 'await'.
@@ -498,8 +467,7 @@ Future<Nothing> FetcherProcess::_fetch(
                      containerId,
                      sandboxDirectory,
                      cacheDirectory,
-                     user,
-                     flags);
+                     user);
     }));
 }
 
@@ -509,8 +477,7 @@ Future<Nothing> FetcherProcess::__fetch(
     const ContainerID& containerId,
     const string& sandboxDirectory,
     const string& cacheDirectory,
-    const Option<string>& user,
-    const Flags& flags)
+    const Option<string>& user)
 {
   // Now construct the FetcherInfo based on which URIs we're using
   // the cache for and which ones we are bypassing the cache.
@@ -553,7 +520,7 @@ Future<Nothing> FetcherProcess::__fetch(
     info.set_frameworks_home(flags.frameworks_home);
   }
 
-  return run(containerId, sandboxDirectory, user, info, flags)
+  return run(containerId, sandboxDirectory, user, info)
     .repair(defer(self(), [=](const Future<Nothing>& future) {
       LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
 
@@ -630,29 +597,20 @@ static off_t delta(
 
 
 // For testing only.
-// TODO(bernd-mesos): After refactoring slave/containerizer,fetcher so
-// that flags and slave ID get injected, replace this with two functions
-// one of which returns a list of cache file paths, the other the number
-// of entries in the cache table.
-Try<list<Path>> FetcherProcess::cacheFiles(
-    const SlaveID& slaveId,
-    const Flags& flags)
+Try<list<Path>> FetcherProcess::cacheFiles()
 {
   list<Path> result;
 
-  const string cacheDirectory =
-    slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
-
-  if (!os::exists(cacheDirectory)) {
+  if (!os::exists(flags.fetcher_cache_dir)) {
     return result;
   }
 
   const Try<list<string>> find =
-    os::find(cacheDirectory, CACHE_FILE_NAME_PREFIX);
+    os::find(flags.fetcher_cache_dir, CACHE_FILE_NAME_PREFIX);
 
   if (find.isError()) {
     return Error("Could not access cache directory '" +
-                 cacheDirectory + "' with error: " + find.error());
+                 flags.fetcher_cache_dir + "' with error: " + find.error());
   }
 
   transform(find.get().begin(),
@@ -724,8 +682,7 @@ Future<Nothing> FetcherProcess::run(
     const ContainerID& containerId,
     const string& sandboxDirectory,
     const Option<string>& user,
-    const FetcherInfo& info,
-    const Flags& flags)
+    const FetcherInfo& info)
 {
   // Before we fetch let's make sure we create 'stdout' and 'stderr'
   // files into which we can redirect the output of the mesos-fetcher

http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 9e3018d..efeadbf 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -25,8 +25,8 @@
 
 #include <mesos/fetcher/fetcher.hpp>
 
-#include <process/id.hpp>
 #include <process/future.hpp>
+#include <process/id.hpp>
 #include <process/process.hpp>
 #include <process/subprocess.hpp>
 
@@ -79,19 +79,13 @@ public:
 
   static bool isNetUri(const std::string& uri);
 
-  Fetcher();
+  Fetcher(const Flags& flags);
 
   // This is only public for tests.
   Fetcher(const process::Owned<FetcherProcess>& process);
 
   virtual ~Fetcher();
 
-  // TODO(bernd-mesos): Inject these parameters at Fetcher creation time.
-  // Then also inject the fetcher into the slave at creation time. Then
-  // it will be possible to make this an instance method instead of a
-  // static one for the slave to call during startup or recovery.
-  static Try<Nothing> recover(const SlaveID& slaveId, const Flags& flags);
-
   // Download the URIs specified in the command info and place the
   // resulting files into the given sandbox directory. Chmod said files
   // to the user if given. Send stdout and stderr output to files
@@ -101,9 +95,7 @@ public:
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
       const std::string& sandboxDirectory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const Flags& flags);
+      const Option<std::string>& user);
 
   // Best effort to kill the fetcher subprocess associated with the
   // indicated container. Do nothing if no such subprocess exists.
@@ -117,7 +109,9 @@ private:
 class FetcherProcess : public process::Process<FetcherProcess>
 {
 public:
-  FetcherProcess() : ProcessBase(process::ID::generate("fetcher")) {}
+  FetcherProcess(const Flags& _flags)
+    : ProcessBase(process::ID::generate("fetcher")),
+      flags(_flags) {}
 
   virtual ~FetcherProcess();
 
@@ -125,9 +119,7 @@ public:
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
       const std::string& sandboxDirectory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const Flags& flags);
+      const Option<std::string>& user);
 
   // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
   // in the given directory, using these for trace output.
@@ -135,8 +127,7 @@ public:
       const ContainerID& containerId,
       const std::string& sandboxDirectory,
       const Option<std::string>& user,
-      const mesos::fetcher::FetcherInfo& info,
-      const Flags& flags);
+      const mesos::fetcher::FetcherInfo& info);
 
   // Best effort attempt to kill the external mesos-fetcher process
   // running on behalf of the given container ID, if any.
@@ -297,14 +288,11 @@ public:
       const ContainerID& containerId,
       const std::string& sandboxDirectory,
       const std::string& cacheDirectory,
-      const Option<std::string>& user,
-      const Flags& flags);
+      const Option<std::string>& user);
 
   // Returns a list of cache files on disk for the given slave
   // (for all users combined). For testing.
-  // TODO(bernd-mesos): Remove the parameters after slave/containerizer
-  // refactoring for injection of these.
-  Try<std::list<Path>> cacheFiles(const SlaveID& slaveId, const Flags& flags);
+  Try<std::list<Path>> cacheFiles();
 
   // Returns the number of cache entries for the given slave (for all
   // users combined). For testing.
@@ -321,8 +309,7 @@ private:
       const ContainerID& containerId,
       const std::string& sandboxDirectory,
       const std::string& cacheDirectory,
-      const Option<std::string>& user,
-      const Flags& flags);
+      const Option<std::string>& user);
 
   // Calls Cache::reserve() and returns a ready entry future if successful,
   // else Failure. Claims the space and assigns the entry's size to this
@@ -331,6 +318,8 @@ private:
       const Try<Bytes>& requestedSpace,
       const std::shared_ptr<Cache::Entry>& entry);
 
+  Flags flags;
+
   Cache cache;
 
   hashmap<ContainerID, pid_t> subprocessPids;

http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index d0dc9c8..0efe67e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -186,18 +186,15 @@ mesos::internal::slave::Flags::Flags()
       "Size of the fetcher cache in Bytes.",
       DEFAULT_FETCHER_CACHE_SIZE);
 
-  // By default the fetcher cache directory is held inside the work
-  // directory, so everything can be deleted or archived in one swoop,
-  // in particular during testing. However, a typical production
-  // scenario is to use a separate cache volume. First, it is not meant
-  // to be backed up. Second, you want to avoid that sandbox directories
-  // and the cache directory can interfere with each other in
-  // unpredictable ways by occupying shared space. So it is recommended
-  // to set the cache directory explicitly.
   add(&Flags::fetcher_cache_dir,
       "fetcher_cache_dir",
-      "Parent directory for fetcher cache directories\n"
-      "(one subdirectory per agent).",
+      "Directory for the fetcher cache. The agent will clear this directory\n"
+      "on startup. It is recommended to set this value to a separate volume\n"
+      "for several reasons:\n"
+      "  * The cache directories are transient and not meant to be\n"
+      "    backed up. Upon restarting the agent, the cache is always empty.\n"
+      "  * The cache and container sandboxes can potentially interfere with\n"
+      "    each other when occupying a shared space (i.e. disk contention).",
       path::join(os::temp(), "mesos", "fetch"));
 
   add(&Flags::work_dir,

http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index a363ae6..80a957b 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -445,7 +445,7 @@ int main(int argc, char** argv)
   }
 #endif // __linux__
 
-  Fetcher* fetcher = new Fetcher();
+  Fetcher* fetcher = new Fetcher(flags);
 
   // Initialize SecretResolver.
   Try<SecretResolver*> secretResolver =

http://git-wip-us.apache.org/repos/asf/mesos/blob/145896bc/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b7e731b..0f21cf8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1125,14 +1125,6 @@ void Slave::registered(
       LOG(INFO) << "Registered with master " << master.get()
                 << "; given agent ID " << slaveId;
 
-      // TODO(bernd-mesos): Make this an instance method call, see comment
-      // in "fetcher.hpp"".
-      Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
-      if (recovered.isError()) {
-        LOG(FATAL) << "Could not initialize fetcher cache: "
-                   << recovered.error();
-      }
-
       state = RUNNING;
 
       // Cancel the pending registration timer to avoid spurious attempts
@@ -5863,13 +5855,6 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
       metrics.recovery_errors += slaveState->errors;
     }
 
-    // TODO(bernd-mesos): Make this an instance method call, see comment
-    // in "fetcher.hpp"".
-    Try<Nothing> recovered = Fetcher::recover(slaveState->id, flags);
-    if (recovered.isError()) {
-      return Failure(recovered.error());
-    }
-
     // Recover the frameworks.
     foreachvalue (const FrameworkState& frameworkState,
                   slaveState->frameworks) {