You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/27 01:02:54 UTC

mesos git commit: Added Appc fetcher support to store.

Repository: mesos
Updated Branches:
  refs/heads/master 8b47febe4 -> 620cb2d2f


Added Appc fetcher support to store.

This change allows store to fetch an image using Appc image fetcher when
an image is not found in the cache. It also recursively fetches the
dependencies for the image.

Review: https://reviews.apache.org/r/43855/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/620cb2d2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/620cb2d2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/620cb2d2

Branch: refs/heads/master
Commit: 620cb2d2f7b6e7f282975284394fb4dbb9591338
Parents: 8b47feb
Author: Jojy Varghese <jo...@mesosphere.io>
Authored: Fri Feb 26 15:34:35 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Feb 26 15:46:38 2016 -0800

----------------------------------------------------------------------
 .../mesos/provisioner/appc/store.cpp            | 236 ++++++++++++++++---
 src/tests/fetcher_cache_tests.cpp               |   9 +-
 2 files changed, 204 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/620cb2d2/src/slave/containerizer/mesos/provisioner/appc/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/appc/store.cpp b/src/slave/containerizer/mesos/provisioner/appc/store.cpp
index 4b38291..955647d 100644
--- a/src/slave/containerizer/mesos/provisioner/appc/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/appc/store.cpp
@@ -18,6 +18,7 @@
 
 #include <glog/logging.h>
 
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 
@@ -29,6 +30,7 @@
 #include <mesos/appc/spec.hpp>
 
 #include "slave/containerizer/mesos/provisioner/appc/cache.hpp"
+#include "slave/containerizer/mesos/provisioner/appc/fetcher.hpp"
 #include "slave/containerizer/mesos/provisioner/appc/paths.hpp"
 #include "slave/containerizer/mesos/provisioner/appc/store.hpp"
 
@@ -50,7 +52,10 @@ namespace appc {
 class StoreProcess : public Process<StoreProcess>
 {
 public:
-  StoreProcess(const string& rootDir, Owned<Cache> cache);
+  StoreProcess(
+      const string& rootDir,
+      Owned<Cache> cache,
+      Owned<Fetcher> fetcher);
 
   ~StoreProcess() {}
 
@@ -59,11 +64,20 @@ public:
   Future<ImageInfo> get(const Image& image);
 
 private:
+  Future<vector<string>> fetchImage(const Image::Appc& appc);
+
+  Future<vector<string>> fetchDependencies(const string& imageId);
+
+  Future<string> _fetchImage(const Image::Appc& appc);
+
+  Future<vector<string>> __fetchImage(const string& imageId);
+
   // Absolute path to the root directory of the store as defined by
   // --appc_store_dir.
   const string rootDir;
 
   Owned<Cache> cache;
+  Owned<Fetcher> fetcher;
 };
 
 
@@ -78,13 +92,9 @@ Try<Owned<slave::Store>> Store::create(const Flags& flags)
   // from it are canonical too.
   Result<string> rootDir = os::realpath(flags.appc_store_dir);
   if (!rootDir.isSome()) {
-    // The above mkdir call recursively creates the store directory
-    // if necessary so it cannot be None here.
-    CHECK_ERROR(rootDir);
-
     return Error(
         "Failed to get the realpath of the store root directory: " +
-        rootDir.error());
+        (rootDir.isError() ? rootDir.error() : "not found"));
   }
 
   Try<Owned<Cache>> cache = Cache::create(Path(rootDir.get()));
@@ -92,8 +102,28 @@ Try<Owned<slave::Store>> Store::create(const Flags& flags)
     return Error("Failed to create image cache: " + cache.error());
   }
 
-  return Owned<slave::Store>(new Store(
-      Owned<StoreProcess>(new StoreProcess(rootDir.get(), cache.get()))));
+  Try<Nothing> recover = cache.get()->recover();
+  if (recover.isError()) {
+    return Error("Failed to load image cache: " + recover.error());
+  }
+
+  // TODO(jojy): Uri fetcher has 'shared' semantics for the
+  // provisioner. It's a shared pointer which needs to be injected
+  // from top level into the store (instead of being created here).
+  Try<Owned<uri::Fetcher>> uriFetcher = uri::fetcher::create();
+  if (uriFetcher.isError()) {
+    return Error("Failed to create uri fetcher: " + uriFetcher.error());
+  }
+
+  Try<Owned<Fetcher>> fetcher = Fetcher::create(flags, uriFetcher->share());
+  if (fetcher.isError()) {
+    return Error("Failed to create image fetcher: " + fetcher.error());
+  }
+
+  return Owned<slave::Store>(new Store(Owned<StoreProcess>(new StoreProcess(
+      rootDir.get(),
+      cache.get(),
+      fetcher.get()))));
 }
 
 
@@ -123,9 +153,13 @@ Future<ImageInfo> Store::get(const Image& image)
 }
 
 
-StoreProcess::StoreProcess(const string& _rootDir, Owned<Cache> _cache)
+StoreProcess::StoreProcess(
+    const string& _rootDir,
+    Owned<Cache> _cache,
+    Owned<Fetcher> _fetcher)
   : rootDir(_rootDir),
-    cache(_cache) {}
+    cache(_cache),
+    fetcher(_fetcher) {}
 
 
 Future<Nothing> StoreProcess::recover()
@@ -147,39 +181,175 @@ Future<ImageInfo> StoreProcess::get(const Image& image)
 
   const Image::Appc& appc = image.appc();
 
-  Option<string> imageId = None();
+  const Path stagingDir(paths::getStagingDir(rootDir));
+
+  Try<Nothing> staging = os::mkdir(stagingDir);
+  if (staging.isError()) {
+    return Failure("Failed to create staging directory: " + staging.error());
+  }
+
+  return fetchImage(appc)
+    .then(defer(self(), [=](const vector<string>& imageIds) -> ImageInfo {
+      vector<string> rootfses;
+
+      // TODO(jojy): Print a warning if there are duplicated image ids
+      // in the list. The semantics is weird when there are duplicated
+      // image ids in the list. Appc spec does not discuss this
+      // situation.
+      foreach (const string& imageId, imageIds) {
+        rootfses.emplace_back(paths::getImageRootfsPath(rootDir, imageId));
+      }
+
+      return ImageInfo{rootfses, None()};
+    }));
+}
 
-  // If the image specifies an id, use that. If not, then find the image in the
-  // cache by its name and labels. Note that if store has the image, it has to
-  // be in the cache. It is possible that an image could be found in cache but
-  // not in the store due to eviction of the image from the store in between
-  // cache restoration and now.
 
-  if (appc.has_id()) {
-    imageId = appc.id();
-  } else {
-    imageId = cache->find(appc);
+// Fetches the image into the 'staging' directory, and recursively
+// fetches the image's dependencies in a depth first order.
+Future<vector<string>> StoreProcess::fetchImage(const Image::Appc& appc)
+{
+  Option<string> imageId = appc.has_id() ? appc.id() : cache->find(appc);
+  if (imageId.isSome()) {
+    if (os::exists(paths::getImagePath(rootDir, imageId.get()))) {
+      VLOG(1) << "Image '" << appc.name() << "' is found in cache with "
+              << "image id '" << imageId.get() << "'";
+
+      return __fetchImage(imageId.get());
+    }
   }
 
-  if (imageId.isNone()) {
-    return Failure("Failed to find image '" + appc.name() + "' in cache");
+  return _fetchImage(appc)
+    .then(defer(self(), &Self::__fetchImage, lambda::_1));
+}
+
+
+Future<string> StoreProcess::_fetchImage(const Image::Appc& appc)
+{
+  VLOG(1) << "Fetching image '" << appc.name() << "'";
+
+  Try<string> _tmpFetchDir = os::mkdtemp(
+      path::join(paths::getStagingDir(rootDir), "XXXXXX"));
+
+  if (_tmpFetchDir.isError()) {
+    return Failure(
+        "Failed to create temporary fetch directory for image '" +
+        appc.name() + "': " + _tmpFetchDir.error());
   }
 
-  // Now validate the image path for the image. This will also check for the
-  // existence of the directory.
-  Option<Error> error =
-    spec::validateLayout(paths::getImagePath(rootDir, imageId.get()));
+  const string tmpFetchDir = _tmpFetchDir.get();
+
+  return fetcher->fetch(appc, Path(tmpFetchDir))
+    .then(defer(self(), [=]() -> Future<string> {
+      Try<list<string>> imageIds = os::ls(tmpFetchDir);
+      if (imageIds.isError()) {
+        return Failure(
+            "Failed to list images under '" + tmpFetchDir +
+            "': " + imageIds.error());
+      }
+
+      if (imageIds->size() != 1) {
+        return Failure(
+            "Unexpected number of images under '" + tmpFetchDir +
+            "': " + stringify(imageIds->size()));
+      }
+
+      const string& imageId = imageIds->front();
+      const string source = path::join(tmpFetchDir, imageId);
+      const string target = paths::getImagePath(rootDir, imageId);
+
+      if (os::exists(target)) {
+        LOG(WARNING) << "Image id '" << imageId
+                     << "' already exists in the store";
+      } else {
+        Try<Nothing> rename = os::rename(source, target);
+        if (rename.isError()) {
+          return Failure(
+              "Failed to rename directory '" + source +
+              "' to '" + target + "': " + rename.error());
+        }
+      }
+
+      Try<Nothing> addCache = cache->add(imageId);
+      if (addCache.isError()) {
+        return Failure(
+            "Failed to add image '" + appc.name() + "' with image id '" +
+            imageId + "' to the cache: " + addCache.error());
+      }
+
+      Try<Nothing> rmdir = os::rmdir(tmpFetchDir);
+      if (rmdir.isError()) {
+        return Failure(
+            "Failed to remove temporary fetch directory '" +
+            tmpFetchDir + "' for image '" + appc.name() + "'");
+      }
+
+      return imageId;
+    }));
+}
+
+
+Future<vector<string>> StoreProcess::__fetchImage(const string& imageId)
+{
+  return fetchDependencies(imageId)
+    .then([imageId](vector<string> imageIds) -> vector<string> {
+      imageIds.emplace_back(imageId);
+
+      return imageIds;
+    });
+}
+
+
+Future<vector<string>> StoreProcess::fetchDependencies(const string& imageId)
+{
+  const string imagePath = paths::getImagePath(rootDir, imageId);
 
-  if (error.isSome()) {
+  Try<spec::ImageManifest> manifest = spec::getManifest(imagePath);
+  if (manifest.isError()) {
     return Failure(
-        "Failed to validate directory for image '" + appc.name() + "': " +
-        error.get().message);
+        "Failed to get dependencies for image id '" + imageId +
+        "': " + manifest.error());
+  }
+
+  vector<Image::Appc> dependencies;
+  foreach (const spec::ImageManifest::Dependency& dependency,
+           manifest->dependencies()) {
+    Image::Appc appc;
+    appc.set_name(dependency.imagename());
+    appc.set_id(dependency.imageid());
+
+    // TODO(jojy): Make Image::Appc use appc::spec::Label instead of
+    // mesos::Label so that we can avoid this loop here.
+    foreach (const spec::ImageManifest::Label& label, dependency.labels()) {
+      mesos::Label appcLabel;
+      appcLabel.set_key(label.name());
+      appcLabel.set_value(label.value());
+
+      appc.mutable_labels()->add_labels()->CopyFrom(appcLabel);
+    }
+
+    dependencies.emplace_back(appc);
   }
 
-  return ImageInfo{
-      vector<string>({paths::getImageRootfsPath(rootDir, imageId.get())}),
-      None()
-  };
+  if (dependencies.size() == 0) {
+    return vector<string>();
+  }
+
+  // Do a depth first search.
+  list<Future<vector<string>>> futures;
+  foreach (const Image::Appc& appc, dependencies) {
+    futures.emplace_back(fetchImage(appc));
+  }
+
+  return collect(futures)
+    .then(defer(self(), [=](const list<vector<string>>& imageIdsList) {
+      vector<string> result;
+      foreach (const vector<string>& imageIds, imageIdsList) {
+        result.insert(result.end(), imageIds.begin(), imageIds.end());
+      }
+
+      return result;
+    }));
 }
 
 } // namespace appc {

http://git-wip-us.apache.org/repos/asf/mesos/blob/620cb2d2/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index e10b3f7..f9c48f5 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -185,14 +185,7 @@ void FetcherCacheTest::SetUp()
   setupCommandFileAsset();
   setupArchiveAsset();
 
-  master::Flags masterFlags = CreateMasterFlags();
-  // In some fetcher cache tests, we want to find out if concurrent execution of
-  // launching tasks works and settling the clock would serialize task
-  // execution, thus we prefer to reduce `allocation_interval` to speed up test
-  // cases here.
-  masterFlags.allocation_interval = Milliseconds(200);
-
-  Try<PID<Master>> master = StartMaster(masterFlags);
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   fetcherProcess = new MockFetcherProcess();